Created
November 6, 2022 17:39
-
-
Save 18182324/7fd77218730c576495ae8fd1660dbad5 to your computer and use it in GitHub Desktop.
Development Framework Trading and Backtesting Trading Strategies
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Load libraries | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from pandas import read_csv, set_option | |
from pandas.plotting import scatter_matrix | |
import seaborn as sns | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.model_selection import train_test_split, KFold, cross_val_score, GridSearchCV | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.tree import DecisionTreeClassifier | |
from sklearn.neighbors import KNeighborsClassifier | |
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis | |
from sklearn.naive_bayes import GaussianNB | |
from sklearn.svm import SVC | |
from sklearn.neural_network import MLPClassifier | |
from sklearn.pipeline import Pipeline | |
from sklearn.ensemble import AdaBoostClassifier, GradientBoostingClassifier, RandomForestClassifier, ExtraTreesClassifier | |
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score | |
#Libraries for Deep Learning Models | |
from keras.models import Sequential | |
from keras.layers import Dense | |
from keras.wrappers.scikit_learn import KerasClassifier | |
from keras.optimizers import SGD | |
# load dataset | |
# download dataset here: https://www.dropbox.com/s/m7y0ezifl3c9hbh/BitstampData.csv.zip?dl=0 | |
dataset = pd.read_csv('BitstampData_sample.csv') | |
#Diable the warnings | |
import warnings | |
warnings.filterwarnings('ignore') | |
#Descriptive Statistics | |
# shape | |
dataset.shape | |
# peek at data | |
set_option('display.width', 100) | |
dataset.tail(5) | |
# describe data | |
set_option('precision', 3) | |
dataset.describe() | |
#Data Cleaning | |
#Checking for any null values and removing the null values''' | |
print('Null Values =',dataset.isnull().values.any()) | |
dataset[dataset.columns.values] = dataset[dataset.columns.values].ffill() | |
dataset=dataset.drop(columns=['Timestamp']) | |
#Preparing the data for classification | |
#We attach a label to each movement: | |
#1 if the signal is that short term price will go up as compared to the long term. | |
#0 if the signal is that short term price will go down as compared to the long term. | |
# Initialize the `signals` DataFrame with the `signal` column | |
#datas['PriceMove'] = 0.0 | |
# Create short simple moving average over the short window | |
dataset['short_mavg'] = dataset['Close'].rolling(window=10, min_periods=1, center=False).mean() | |
# Create long simple moving average over the long window | |
dataset['long_mavg'] = dataset['Close'].rolling(window=60, min_periods=1, center=False).mean() | |
# Create signals | |
dataset['signal'] = np.where(dataset['short_mavg'] > dataset['long_mavg'], 1.0, 0.0) | |
dataset.tail() | |
#Feature Engineering - Calculating Technical Indicators | |
#calculation of exponential moving average | |
def EMA(df, n): | |
EMA = pd.Series(df['Close'].ewm(span=n, min_periods=n).mean(), name='EMA_' + str(n)) | |
return EMA | |
dataset['EMA10'] = EMA(dataset, 10) | |
dataset['EMA30'] = EMA(dataset, 30) | |
dataset['EMA200'] = EMA(dataset, 200) | |
dataset.head() | |
#calculation of rate of change | |
def ROC(df, n): | |
M = df.diff(n - 1) | |
N = df.shift(n - 1) | |
ROC = pd.Series(((M / N) * 100), name = 'ROC_' + str(n)) | |
return ROC | |
dataset['ROC10'] = ROC(dataset['Close'], 10) | |
dataset['ROC30'] = ROC(dataset['Close'], 30) | |
#Calculation of price momentum | |
def MOM(df, n): | |
MOM = pd.Series(df.diff(n), name='Momentum_' + str(n)) | |
return MOM | |
dataset['MOM10'] = MOM(dataset['Close'], 10) | |
dataset['MOM30'] = MOM(dataset['Close'], 30) | |
#calculation of relative strength index | |
def RSI(series, period): | |
delta = series.diff().dropna() | |
u = delta * 0 | |
d = u.copy() | |
u[delta > 0] = delta[delta > 0] | |
d[delta < 0] = -delta[delta < 0] | |
u[u.index[period-1]] = np.mean( u[:period] ) #first value is sum of avg gains | |
u = u.drop(u.index[:(period-1)]) | |
d[d.index[period-1]] = np.mean( d[:period] ) #first value is sum of avg losses | |
d = d.drop(d.index[:(period-1)]) | |
rs = u.ewm(com=period-1, adjust=False).mean() / \ | |
d.ewm(com=period-1, adjust=False).mean() | |
return 100 - 100 / (1 + rs) | |
dataset['RSI10'] = RSI(dataset['Close'], 10) | |
dataset['RSI30'] = RSI(dataset['Close'], 30) | |
dataset['RSI200'] = RSI(dataset['Close'], 200) | |
#calculation of stochastic osillator. | |
def STOK(close, low, high, n): | |
STOK = ((close - low.rolling(n).min()) / (high.rolling(n).max() - low.rolling(n).min())) * 100 | |
return STOK | |
def STOD(close, low, high, n): | |
STOK = ((close - low.rolling(n).min()) / (high.rolling(n).max() - low.rolling(n).min())) * 100 | |
STOD = STOK.rolling(3).mean() | |
return STOD | |
dataset['%K10'] = STOK(dataset['Close'], dataset['Low'], dataset['High'], 10) | |
dataset['%D10'] = STOD(dataset['Close'], dataset['Low'], dataset['High'], 10) | |
dataset['%K30'] = STOK(dataset['Close'], dataset['Low'], dataset['High'], 30) | |
dataset['%D30'] = STOD(dataset['Close'], dataset['Low'], dataset['High'], 30) | |
dataset['%K200'] = STOK(dataset['Close'], dataset['Low'], dataset['High'], 200) | |
dataset['%D200'] = STOD(dataset['Close'], dataset['Low'], dataset['High'], 200) | |
#Calculation of moving average | |
def MA(df, n): | |
MA = pd.Series(df['Close'].rolling(n, min_periods=n).mean(), name='MA_' + str(n)) | |
return MA | |
dataset['MA21'] = MA(dataset, 10) | |
dataset['MA63'] = MA(dataset, 30) | |
dataset['MA252'] = MA(dataset, 200) | |
dataset.tail() | |
dataset.tail() | |
#excluding columns that are not needed for our prediction. | |
dataset=dataset.drop(['High','Low','Open', 'Volume_(Currency)','short_mavg','long_mavg'], axis=1) | |
dataset = dataset.dropna(axis=0) | |
dataset.tail() | |
#Visualizing Data | |
dataset[['Weighted_Price']].plot(grid=True) | |
plt.show() | |
#Histograms | |
dataset.hist(sharex=False, sharey=False, xlabelsize=1, ylabelsize=1, figsize=(12,12)) | |
plt.show() | |
fig = plt.figure() | |
plot = dataset.groupby(['signal']).size().plot(kind='barh', color='red') | |
plt.show() | |
#Correlation Matrix | |
correlation = dataset.corr() | |
plt.figure(figsize=(15,15)) | |
plt.title('Correlation Matrix') | |
sns.heatmap(correlation, vmax=1, square=True,annot=True,cmap='cubehelix') | |
#Train Test and Split - Evaluating Data | |
# split out validation dataset for the end | |
subset_dataset= dataset.iloc[-100000:] | |
Y= subset_dataset["signal"] | |
X = subset_dataset.loc[:, dataset.columns != 'signal'] | |
validation_size = 0.2 | |
seed = 1 | |
X_train, X_validation, Y_train, Y_validation = train_test_split(X, Y, test_size=validation_size, random_state=1) | |
#Test options | |
# test options for classification | |
num_folds = 10 | |
seed = 7 | |
scoring = 'accuracy' | |
#scoring = 'precision' | |
#scoring = 'recall' | |
#scoring ='neg_log_loss' | |
#scoring = 'roc_auc' | |
#Compare Models | |
# spot check the algorithms | |
models = [] | |
models.append(('LR', LogisticRegression(n_jobs=-1))) | |
models.append(('LDA', LinearDiscriminantAnalysis())) | |
models.append(('KNN', KNeighborsClassifier())) | |
models.append(('CART', DecisionTreeClassifier())) | |
models.append(('NB', GaussianNB())) | |
#Neural Network | |
models.append(('NN', MLPClassifier())) | |
#Ensable Models | |
# Boosting methods | |
models.append(('AB', AdaBoostClassifier())) | |
models.append(('GBM', GradientBoostingClassifier())) | |
# Bagging methods | |
models.append(('RF', RandomForestClassifier(n_jobs=-1))) | |
#K-folds cross validation | |
results = [] | |
names = [] | |
for name, model in models: | |
kfold = KFold(n_splits=num_folds, random_state=seed) | |
cv_results = cross_val_score(model, X_train, Y_train, cv=kfold, scoring=scoring) | |
results.append(cv_results) | |
names.append(name) | |
msg = "%s: %f (%f)" % (name, cv_results.mean(), cv_results.std()) | |
print(msg) | |
# compare algorithms | |
fig = plt.figure() | |
fig.suptitle('Algorithm Comparison') | |
ax = fig.add_subplot(111) | |
plt.boxplot(results) | |
ax.set_xticklabels(names) | |
fig.set_size_inches(15,8) | |
plt.show() | |
#Model Tuning and Grid Search | |
# Grid Search: Random Forest Classifier | |
''' | |
n_estimators : int (default=100) | |
The number of boosting stages to perform. | |
Gradient boosting is fairly robust to over-fitting so a large number usually results in better performance. | |
max_depth : integer, optional (default=3) | |
maximum depth of the individual regression estimators. | |
The maximum depth limits the number of nodes in the tree. | |
Tune this parameter for best performance; the best value depends on the interaction of the input variables | |
criterion : string, optional (default=”gini”) | |
The function to measure the quality of a split. | |
Supported criteria are “gini” for the Gini impurity and “entropy” for the information gain. | |
''' | |
scaler = StandardScaler().fit(X_train) | |
rescaledX = scaler.transform(X_train) | |
n_estimators = [20,80] | |
max_depth= [5,10] | |
criterion = ["gini","entropy"] | |
param_grid = dict(n_estimators=n_estimators, max_depth=max_depth, criterion = criterion ) | |
model = RandomForestClassifier(n_jobs=-1) | |
kfold = KFold(n_splits=num_folds, random_state=seed) | |
grid = GridSearchCV(estimator=model, param_grid=param_grid, scoring=scoring, cv=kfold) | |
grid_result = grid.fit(rescaledX, Y_train) | |
#Print Results | |
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_)) | |
means = grid_result.cv_results_['mean_test_score'] | |
stds = grid_result.cv_results_['std_test_score'] | |
params = grid_result.cv_results_['params'] | |
ranks = grid_result.cv_results_['rank_test_score'] | |
for mean, stdev, param, rank in zip(means, stds, params, ranks): | |
print("#%d %f (%f) with: %r" % (rank, mean, stdev, param)) | |
#Results | |
#prepare model | |
model = RandomForestClassifier(criterion='gini', n_estimators=80,max_depth=10,n_jobs=-1) # rbf is default kernel | |
#model = LogisticRegression() | |
model.fit(X_train, Y_train) | |
#estimate accuracy on validation set | |
predictions = model.predict(X_validation) | |
print(accuracy_score(Y_validation, predictions)) | |
print(confusion_matrix(Y_validation, predictions)) | |
print(classification_report(Y_validation, predictions)) | |
df_cm = pd.DataFrame(confusion_matrix(Y_validation, predictions), columns=np.unique(Y_validation), index = np.unique(Y_validation)) | |
df_cm.index.name = 'Actual' | |
df_cm.columns.name = 'Predicted' | |
sns.heatmap(df_cm, cmap="Blues", annot=True,annot_kws={"size": 16})# font sizes | |
#Analyze feature importance | |
Importance = pd.DataFrame({'Importance':model.feature_importances_*100}, index=X.columns) | |
Importance.sort_values('Importance', axis=0, ascending=True).plot(kind='barh', color='r' ) | |
plt.xlabel('Variable Importance') | |
#Perform Backtest | |
#Create column for Strategy Returns by multiplying the daily returns by the position that was held at close | |
#of business the previous day | |
backtestdata = pd.DataFrame(index=X_validation.index) | |
#backtestdata = pd.DataFrame() | |
backtestdata['signal_pred'] = predictions | |
backtestdata['signal_actual'] = Y_validation | |
backtestdata['Market Returns'] = X_validation['Close'].pct_change() | |
backtestdata['Actual Returns'] = backtestdata['Market Returns'] * backtestdata['signal_actual'].shift(1) | |
backtestdata['Strategy Returns'] = backtestdata['Market Returns'] * backtestdata['signal_pred'].shift(1) | |
backtestdata=backtestdata.reset_index() | |
backtestdata.head() | |
backtestdata[['Strategy Returns','Actual Returns']].cumsum().hist() | |
backtestdata[['Strategy Returns','Actual Returns']].cumsum().plot() | |
#end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment