Skip to content

Instantly share code, notes, and snippets.

@JohnAllen
Created April 30, 2022 06:24
Show Gist options
  • Save JohnAllen/0e877f43286421bb5141e3f2073ba057 to your computer and use it in GitHub Desktop.
Save JohnAllen/0e877f43286421bb5141e3f2073ba057 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
#%%
import numpy as np
import pandas as pd
import feather
import os, sys, re, ast, csv, math, gc, random, enum, argparse, json, requests, time
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None) # to ensure console display all columns
pd.set_option('display.float_format', '{:0.3f}'.format)
pd.set_option('display.max_row', 40)
plt.style.use('ggplot')
from pathlib import Path
import joblib
from joblib import dump, load
from collections import defaultdict
from copy import deepcopy
project = 'Financial_Statistical_Analysis'
dirPath = Path(r"E:/")
projectBasePath = Path(r'E:/Projects')
projectPath = projectBasePath / f"{project}"
os.chdir(projectPath)
sys.path.append(str(projectPath))
dataPath = projectPath / 'data'
pickleDataPath = dataPath / 'pickle'
htmlDataPath = dataPath / 'html'
imageDataPath = dataPath / 'image'
dataInputPath = dataPath / 'input'
dataWorkingPath = dataPath / 'working'
dataOutputPath = dataPath / 'output'
modelPath = projectPath / 'models'
#%% md
We define candle body size to be the absolute value of close - open, and candle size to be high - low
#%%
data_df = pd.read_csv(dataInputPath / f"btcBTC60-min.csv")
data_df['CandleBodySize'] = np.abs(data_df['Close'] - data_df['Open'])
data_df['CandleTotalSize'] = np.abs(data_df['High'] - data_df['Low'])
fig, axes = plt.subplots(1, 2)
fig.set_size_inches(16, 8)
data_df.hist('CandleBodySize', bins=100, color='c', alpha=0.6, ax=axes[0])
data_df.hist('CandleTotalSize', bins=100, color='c', alpha=0.6, ax=axes[1])
print(f"CandleBodySize 80 Percentile: {data_df['CandleBodySize'].quantile(0.80):,.0f}")
print(f"CandleTotalSize 80 Percentile: {data_df['CandleTotalSize'].quantile(0.80):,.0f}")
#%% md
Define "large" candle to be sizes that are approx >= than the 0.8 quantile. In this case, a large body bar is one where the body size is >= 300, and a large total size bar is where total size >= 600.
#%%
data_df['IsLargeBodySize'] = data_df['CandleBodySize'] >= 300
data_df['IsLargeTotalSize'] = data_df['CandleTotalSize'] >= 600
#%% md
Down bar is defined as current closing price is less than the previous closing price. Similarly, Up bar is defined as current closing price is greater than previous closing price. <br>
Note that there is a small number of bars that are neither up or down bars (approx 2%), due to previous close same as current close
#%%
data_df['PrevClose'] = data_df['Close'].shift(1)
data_df['IsDownBar'] = data_df['Close'] < data_df['PrevClose']
data_df['IsUpBar'] = data_df['Close'] > data_df['PrevClose']
#%% md
We label:
1. Large body up bar as LU
2. Standard up bar as U
3. Large body down bar as LD
4. Standard down bar as D
5. Otherwise label as N
#%%
def get_candle_type(row):
if row['IsDownBar'] & (row['IsLargeBodySize']):
out = 'LD'
elif row['IsDownBar'] & (not row['IsLargeBodySize']):
out = 'D'
elif row['IsUpBar'] & (row['IsLargeBodySize']):
out = 'LU'
elif row['IsUpBar'] & (not row['IsLargeBodySize']):
out = 'U'
else:
out = 'N'
return out
#%% md
Getting the past 2 & 3 candle sequence
#%%
data_df['CandleType'] = data_df.apply(get_candle_type, axis=1)
data_df['CandleSequence2'] = ['_'.join(list(x)) for x in pd.Series(data_df['CandleType'].rolling(2, 1))]
data_df['CandleSequence2'] = data_df['CandleSequence2'].shift(1)
data_df['CandleSequence2'] = data_df['CandleSequence2'].fillna('N')
data_df['CandleSequence3'] = ['_'.join(list(x)) for x in pd.Series(data_df['CandleType'].rolling(3, 1))]
data_df['CandleSequence3'] = data_df['CandleSequence3'].shift(1)
data_df['CandleSequence3'] = data_df['CandleSequence3'].fillna('N')
for col in ['IsDownBar','IsUpBar','IsLargeBodySize','IsLargeTotalSize']:
data_df[f'{col}Prev'] = data_df[col].shift(1)
data_df[f'{col}Prev'] = data_df[f'{col}Prev'].fillna(False)
#%% md
Getting the Year-Quarter label
#%%
data_df['Date'] = pd.to_datetime(data_df['Date'])
def get_quarter(m):
if m in [1,2,3]:
out = 'Q1'
elif m in [4,5,6]:
out = 'Q2'
elif m in [7,8,9]:
out = 'Q3'
elif m in [10,11,12]:
out = 'Q4'
else:
out = 'NA'
return out
data_df['Year'] = data_df['Date'].dt.year
data_df['Month'] = data_df['Date'].dt.month
data_df['Quarter'] = data_df['Month'].apply(lambda m: get_quarter(m))
data_df['Period'] = data_df['Year'].astype(str) + '-' + data_df['Quarter']
#%% md
It seems that given that previous bar is a down bar, it is more likely to be an up bar.
The same conclusion hold for prediction down bar, although with a lower prediction power.
#%%
downBarCount = data_df.loc[data_df['IsDownBarPrev'], 'IsDownBar'].sum()
upBarCount = data_df.loc[data_df['IsDownBarPrev'], 'IsUpBar'].sum()
totalBarCount = downBarCount + upBarCount
print(f"Given that it is a down bar previously, probability that it is a downbar now: \n {downBarCount / totalBarCount:.1%}")
print(f"Given that it is a down bar previously, probability that it is a upbar now: \n {upBarCount / totalBarCount:.1%}")
downBarCount = data_df.loc[data_df['IsUpBarPrev'], 'IsDownBar'].sum()
upBarCount = data_df.loc[data_df['IsUpBarPrev'], 'IsUpBar'].sum()
totalBarCount = downBarCount + upBarCount
print(f"Given that it is a up bar previously, probability that it is a downbar now: \n {downBarCount / totalBarCount:.1%}")
print(f"Given that it is a up bar previously, probability that it is a upbar now: \n {upBarCount / totalBarCount:.1%}")
#%% md
Given that it is a large down bar previously, it is more likely to be up bar subsequently. We cant say the same for the opposite direction. An important observation here is that bar size matters for prediction.
#%%
## Previous Direction + Body Size
downBarCount = data_df.loc[data_df['IsDownBarPrev'] & data_df['IsLargeBodySizePrev'], 'IsDownBar'].sum()
upBarCount = data_df.loc[data_df['IsDownBarPrev'] & data_df['IsLargeBodySizePrev'], 'IsUpBar'].sum()
totalBarCount = downBarCount + upBarCount
print(f"Given that it is a large body down bar previously, probability that it is a downbar now: \n {downBarCount / totalBarCount:.1%}")
print(f"Given that it is a large body down bar previously, probability that it is a upbar now: \n {upBarCount / totalBarCount:.1%}")
downBarCount = data_df.loc[data_df['IsUpBarPrev'] & data_df['IsLargeBodySizePrev'], 'IsDownBar'].sum()
upBarCount = data_df.loc[data_df['IsUpBarPrev'] & data_df['IsLargeBodySizePrev'], 'IsUpBar'].sum()
totalBarCount = downBarCount + upBarCount
print(f"Given that it is a large body up bar previously, probability that it is a downbar now: \n {downBarCount / totalBarCount:.1%}")
print(f"Given that it is a large body up bar previously, probability that it is a upbar now: \n {upBarCount / totalBarCount:.1%}")
#%% md
Compared to total size, body size seems to be a better predictor for up bar. For the rest of the analysis, we will use large body size as the definition of large bars
#%%
## Previous Direction + Total Size
downBarCount = data_df.loc[data_df['IsDownBarPrev'] & data_df['IsLargeTotalSizePrev'], 'IsDownBar'].sum()
upBarCount = data_df.loc[data_df['IsDownBarPrev'] & data_df['IsLargeTotalSizePrev'], 'IsUpBar'].sum()
totalBarCount = downBarCount + upBarCount
print(f"Given that it is a large size down bar previously, probability that it is a downbar now: \n {downBarCount / totalBarCount:.1%}")
print(f"Given that it is a large size down bar previously, probability that it is a upbar now: \n {upBarCount / totalBarCount:.1%}")
downBarCount = data_df.loc[data_df['IsUpBarPrev'] & data_df['IsLargeTotalSizePrev'], 'IsDownBar'].sum()
upBarCount = data_df.loc[data_df['IsUpBarPrev'] & data_df['IsLargeTotalSizePrev'], 'IsUpBar'].sum()
totalBarCount = downBarCount + upBarCount
print(f"Given that it is a large size up bar previously, probability that it is a downbar now: \n {downBarCount / totalBarCount:.1%}")
print(f"Given that it is a large size up bar previously, probability that it is a upbar now: \n {upBarCount / totalBarCount:.1%}")
#%% md
We first analyze a sequence size of 2. <br>
The below shows the most common sequence which we will use for analysis. We focus on this sequences are they appear more frequently, which translate to more trading opportunities. <br>
The most promising previous candle sequence to predict a down bar is:
1. LU_U
2. U_U
3. LU_D
4. U_LU
The most promising previous candle sequence to predict a up bar is:
1. LD_D
2. D_LD
3. LD_U
4. D_D
This shows that the market follows a contraion pattern, where large moves typically will result in a move in the opposite direction.
We can also draw the conclusion that using multiple candles can improve predictive power.
#%%
sequence_size = 2
top_n = 5
for direction in ['down','up']:
print(f"Given currently is a {direction} candle, top 10 most common past candle sequence pairs")
count_df = data_df.loc[data_df[f'Is{direction.title()}Bar'], f'CandleSequence{sequence_size}'].value_counts()
print(count_df.head(10))
pattern_list = list(count_df.head(10).index)
probability_dict = {}
for pattern_ in pattern_list:
downBarCount = data_df.loc[data_df[f'CandleSequence{sequence_size}'] == pattern_, 'IsDownBar'].sum()
upBarCount = data_df.loc[data_df[f'CandleSequence{sequence_size}'] == pattern_, 'IsUpBar'].sum()
totalBarCount = downBarCount + upBarCount
if direction == 'down':
probability_dict[pattern_] = downBarCount / totalBarCount
if direction == 'up':
probability_dict[pattern_] = upBarCount / totalBarCount
prob_vec = np.array([probability_dict[x] for x in probability_dict])
idx = np.argsort(prob_vec)
print()
print(f"Given currently is a {direction} candle, top 5 most interesting pattern from the common sequence:")
for prob_, pattern_ in zip(prob_vec[idx][::-1][:top_n], np.array(pattern_list)[idx][::-1][:top_n]):
print(f"Given that previous candle sequence is {pattern_}, probability that it is a {direction}bar now: {prob_:.1%}")
print()
#%% md
We do similar analysis on a sequence of 3 bars
#%%
sequence_size = 3
top_n = 5
for direction in ['down','up']:
print(f"Given currently is a {direction} candle, top 10 most common past candle sequence triplets")
count_df = data_df.loc[data_df[f'Is{direction.title()}Bar'], f'CandleSequence{sequence_size}'].value_counts()
print(count_df.head(10))
pattern_list = list(count_df.head(10).index)
probability_dict = {}
for pattern_ in pattern_list:
downBarCount = data_df.loc[data_df[f'CandleSequence{sequence_size}'] == pattern_, 'IsDownBar'].sum()
upBarCount = data_df.loc[data_df[f'CandleSequence{sequence_size}'] == pattern_, 'IsUpBar'].sum()
totalBarCount = downBarCount + upBarCount
if direction == 'down':
probability_dict[pattern_] = downBarCount / totalBarCount
if direction == 'up':
probability_dict[pattern_] = upBarCount / totalBarCount
prob_vec = np.array([probability_dict[x] for x in probability_dict])
idx = np.argsort(prob_vec)
print()
print(f"Given currently is a {direction} candle, top 5 most interesting pattern from the common sequence:")
for prob_, pattern_ in zip(prob_vec[idx][::-1][:top_n], np.array(pattern_list)[idx][::-1][:top_n]):
print(f"Given that previous candle sequence is {pattern_}, probability that it is a {direction}bar now: {prob_:.1%}")
print()
#%% md
We take note of the useful pattern, and analyze their predictive performance accross quaterly periods. We want patterns that have stable performance accross time.
#%%
useful_pattern_dict = {
2: {
'down': ['U_U','LU_D','U_LU','LU_U'],
'up': ['LD_D','D_LD','D_D','LD_U'],
},
3: {
'down': ['D_U_U','LD_U_U','U_LD_U','U_U_U'],
'up': ['D_D_LD','D_D_D','D_LD_U','D_U_D'],
},
}
period_list = sorted(list(set(data_df['Period'])))
output_dict = {}
for sequence_size_ in useful_pattern_dict:
output_dict[sequence_size_] = {}
for direction_ in useful_pattern_dict[sequence_size_]:
output_dict[sequence_size_][direction_] = {}
for pattern_ in useful_pattern_dict[sequence_size_][direction_]:
output_dict[sequence_size_][direction_][pattern_] = {}
for period_ in period_list:
data_df_period = data_df[data_df['Period']==period_].copy()
downBarCount = data_df_period.loc[data_df_period[f'CandleSequence{sequence_size_}'] == pattern_, 'IsDownBar'].sum()
upBarCount = data_df_period.loc[data_df_period[f'CandleSequence{sequence_size_}'] == pattern_, 'IsUpBar'].sum()
totalBarCount = downBarCount + upBarCount
if direction_ == 'down':
probability_ = downBarCount / totalBarCount
if direction_ == 'up':
probability_ = upBarCount / totalBarCount
output_dict[sequence_size_][direction_][pattern_][period_] = probability_
import matplotlib.pyplot as plt
for sequence_size_ in useful_pattern_dict:
for direction_ in useful_pattern_dict[sequence_size_]:
for pattern_ in useful_pattern_dict[sequence_size_][direction_]:
x_ = list(output_dict[sequence_size_][direction_][pattern_].keys())
y_ = [output_dict[sequence_size_][direction_][pattern_][x] for x in x_]
fig = plt.figure()
ax = fig.add_axes([0,0,1,1])
ax.bar(x_,y_)
plt.axhline(y=0.5, color='b', linestyle='--')
ax.set_title(f"Quarterly prediction probability for {direction_.upper()} bar when past sequence is {pattern_}")
fig.set_size_inches(16, 8)
plt.show()
#%% md
For predicting down bars, the patterns that exhibit stable performance accross time are:
1. U_U
2. LU_U
3. D_U_U
4. U_U_U
Note that since both D_U_U & U_U_U are predictive, it means that a 2 candle sequence of U_U is sufficient for predicting down bars.
For predicting up bars, the patterns that exhibit stable performance accross time are:
1. D_LD ?
2. D_D ?
3. D_D_LD ?
4. D_U_D
It seems that predicting up bars are more difficult and less stable. This is due to the above analysis are based on reversal strategy. When market crashes, momentum strategies are more useful. Hence, the above analysis need to be supplemented by context features that represent market state to have a better and more robust prediction.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment