Skip to content

Instantly share code, notes, and snippets.

@ivannp
Created April 1, 2017 00:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ivannp/52d3b8ddc8d62943fc2d496ea27d9547 to your computer and use it in GitHub Desktop.
Save ivannp/52d3b8ddc8d62943fc2d496ea27d9547 to your computer and use it in GitHub Desktop.
import dshelper as dsh
import instrumentdb as idb
import logging
import numpy as np
import os
import pandas as pd
import psutil
import sys
import tensorflow as tf
import time
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis as QDA
from sqlalchemy import create_engine, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Float, ForeignKey, UniqueConstraint
from sqlalchemy.orm import sessionmaker
DeclarativeBase = declarative_base()
class Model(DeclarativeBase):
__tablename__ = 'models'
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
__table_args__ = (UniqueConstraint('name', name='unco1'),)
class Forecast(DeclarativeBase):
__tablename__ = 'forecasts'
id = Column(Integer, primary_key=True)
model = Column(Integer, ForeignKey('models.id'), nullable=False)
symbol = Column(String)
ts = Column(DateTime)
fore = Column(Float)
details = Column(String)
__table_args__ = (UniqueConstraint('model', 'ts', 'symbol', name='unco1'),)
class WalkForwardLoop:
def __init__(self, model_name, log_file=None, classifier=None, index_format='%Y-%m-%d', db_url=None, scale=True,
verbose=False, tensorflow=True):
self.model_name = model_name # The model name to use for the database
self.classifier = classifier # The classifier object
self.log_file = log_file
self.index_format = index_format
self.db_url = db_url
self.db_session = None
self.scale = scale
if self.db_url is not None:
self.init_db()
self.verbose = verbose
self.tensorflow = tensorflow
def init_db(self):
engine = create_engine(self.db_url)
DeclarativeBase.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
self.db_session = Session()
try:
self.db_session.add(Model(name=self.model_name))
self.db_session.commit()
except:
self.db_session.rollback()
pass
self.model_id = self.db_session.query(Model.id).filter(Model.name == self.model_name).first()[0]
def run(self, features, response, forecast_locations, max_history=1e6, symbol_column=None, tensorflow=None,
verbose=None):
assert len(features) == len(response)
if isinstance(verbose, bool):
self.verbose = verbose
if isinstance(tensorflow, bool):
self.tensorflow = tensorflow
db_session = None
if self.db_url is not None:
self.init_db()
timer = None
if sys.platform == 'win32':
timer = time.clock
else:
timer = time.time
for ii in range(0, forecast_locations.len()):
# Prepare the range for training for this iteration
history_end = forecast_locations.starts[ii]
history_start = 0
if (history_end - history_start + 1) > max_history:
history_start = history_end - max_history + 1
xx = features.iloc[history_start:history_end].as_matrix()
yy = response.iloc[history_start:history_end].as_matrix()
# Scale the data
if self.scale:
std_scaler = StandardScaler()
xx = std_scaler.fit_transform(xx)
fore_xx = features.iloc[forecast_locations.starts[ii]:(forecast_locations.ends[ii] + 1)].as_matrix()
if self.scale:
fore_xx = std_scaler.transform(fore_xx)
# Train the model and predict
start = timer()
# fore = self.classifier.fit_predict(xx, yy, fore_xx)
if tensorflow:
fore = self.tensorflow_fit_predict(xx, yy, fore_xx)
else:
fore = self.fit_predict(xx, yy, fore_xx)
forecasting_time = timer() - start
fore_df = pd.DataFrame(fore, index=features.iloc[
forecast_locations.starts[ii]:(forecast_locations.ends[ii] + 1)].index)
# Generate proper column names. Map -1,0,1 to 'short','out','long'. The 4th column is the class.
# fore_df.columns = np.append(np.array(['short','long'])[self.classes.astype(int) + 1], ['class'])
fore_df.ix[:, 2] = np.where(fore_df.ix[:, 2] == -1, 'short', 'long')
fore_df.columns = np.array(['short_prob', 'long_prob', 'class'])
# print(fore_df)
fore = fore[:, 2]
metric = np.round(np.amax(fore_df.ix[:, 0:4], axis=1), 2)
# Save results to a database or somewhere else
if self.db_session is not None:
for jj in range(len(fore)):
row_id = forecast_locations.starts[ii] + jj
ts = features.index[row_id]
details = fore_df.iloc[[jj]].to_json(orient='split', date_format='iso')
if symbol_column is not None:
symbol = symbol_column[row_id]
rs = self.db_session.query(Forecast.id).filter(Forecast.ts == ts).filter(
Forecast.model == self.model_id).filter(Forecast.symbol == symbol).first()
if rs is None:
ff = Forecast(model=self.model_id, ts=ts, fore=fore[jj], details=details, symbol=symbol)
self.db_session.add(ff)
else:
ff = Forecast(id=rs[0], model=self.model_id, ts=ts, fore=fore[jj], details=details,
symbol=symbol)
self.db_session.merge(ff)
else:
rs = self.db_session.query(Forecast.id).filter(Forecast.ts == ts).filter(
Forecast.model == self.model_id).first()
if rs is None:
ff = Forecast(model=self.model_id, ts=ts, fore=fore[jj], details=details)
self.db_session.add(ff)
else:
ff = Forecast(id=rs[0], model=self.model_id, ts=ts, fore=fore[jj], details=details)
self.db_session.merge(ff)
# Log output
if self.log_file is not None:
out_str = "\n" + features.index[forecast_locations.starts[ii]].strftime(self.index_format) + " - " + \
features.index[forecast_locations.ends[ii]].strftime(self.index_format) + "\n" + \
"=======================\n" + \
" history: from: " + features.index[history_start].strftime(self.index_format) + ", to: " + \
features.index[history_end - 1].strftime(self.index_format) + \
", length: " + str(history_end - history_start) + "\n" + \
" forecast length: " + str(
forecast_locations.ends[ii] - forecast_locations.starts[ii] + 1) + "\n" + \
" forecast: [" + ','.join(str(round(ff, 2)) for ff in fore) + "]\n" + \
" probs: [" + ','.join(str(round(mm, 2)) for mm in metric) + "]\n" + \
" time [training+forecasting]: " + str(round(forecasting_time, 2)) + " secs\n"
with open(self.log_file, "a") as ff:
print(out_str, file=ff)
if self.db_session is not None:
self.db_session.commit()
def tensorflow_fit_predict(self, x, y, newx):
learning_rate = 0.01
batch_size = 'auto'
num_passes = 2
display_step = 1
if isinstance(batch_size, str):
if batch_size == 'auto':
batch_size = min(200, x.shape[0])
else:
raise ValueError("'auto' is the only acceptable string for batch_size")
num_batches = x.shape[0] // batch_size
# print("num_batches = {0}, batch_size = {1}, x.shape = {2}".format(num_batches, batch_size, x.shape))
# Map the y's to [0,nlevels)
classes = np.sort(np.unique(y))
self.classes = classes
yz = np.searchsorted(classes, y)
# One hot encode them
ohe = OneHotEncoder(n_values=len(classes), sparse=False)
yy = ohe.fit_transform(yz)
res = None
nfeatures = x.shape[1]
nlabels = yy.shape[1]
# Define the tensorflow graph. A new graph for each iteration. Otherwise, all iterations use
# the default graph, and the memory usage explodes quickly.
with tf.Graph().as_default():
input = tf.placeholder(tf.float32, [None, nfeatures])
label = tf.placeholder(tf.float32, [None, nlabels])
nconv1 = 32
cw1 = tf.Variable(tf.random_normal([1, 3, 1, nconv1]))
cb1 = tf.Variable(tf.random_normal([nconv1]))
conv_input = tf.reshape(input, shape=[-1, 1, nfeatures, 1])
cl1 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv_input, cw1, strides=[1, 1, 1, 1], padding='SAME'), cb1))
mp1 = tf.nn.max_pool(cl1, ksize=[1, 1, 2, 1], strides=[1, 1, 2, 1], padding='SAME')
nhidden1 = 128
w1 = tf.Variable(tf.random_normal([378*32, nhidden1]))
b1 = tf.Variable(tf.random_normal([nhidden1]))
fc_input = tf.reshape(mp1, [-1, w1.get_shape().as_list()[0]])
l1 = tf.nn.relu(tf.add(tf.matmul(fc_input, w1), b1))
nhidden2 = 128
w2 = tf.Variable(tf.random_normal([nhidden1, nhidden2]))
b2 = tf.Variable(tf.random_normal([nhidden2]))
l2 = tf.nn.relu(tf.add(tf.matmul(l1, w2), b2))
w3 = tf.Variable(tf.random_normal([nhidden2, nlabels]))
b3 = tf.Variable(tf.random_normal([nlabels]))
model = tf.add(tf.matmul(l2, w3), b3)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=model, labels=label))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)
correct_fores = tf.equal(tf.argmax(model, 1), tf.argmax(label, 1))
accuracy = tf.reduce_mean(tf.cast(correct_fores, tf.float32))
# Initializing the variables
init_op = tf.global_variables_initializer()
# Train our neural network
all_features = np.array_split(x, num_batches)
all_labels = np.array_split(yy, num_batches)
# Launch the graph
with tf.Session() as sess:
sess.run(init_op)
avg_cost = 0.
total_batches = num_batches * num_passes
# Log some data
first_shape = all_features[0].shape
last_shape = all_features[len(all_features)-1].shape
logging.debug("total_batches = {0}, first shape = {1}, last shape = {2}, x shape = {3}, y shape = {4}".format(total_batches, first_shape, last_shape, x.shape, yy.shape))
process = psutil.Process(os.getpid())
logging.debug(process.memory_info().rss)
for ii in range(total_batches):
features = np.ascontiguousarray(all_features[ii % num_batches])
labels = np.ascontiguousarray(all_labels[ii % num_batches])
_, cc, acc = sess.run([optimizer, cost, accuracy], feed_dict={input: features, label: labels})
if ii % display_step == 0 and self.verbose:
logging.info("Minibatch: {0:04d}, Loss: {1:.4f}, Accuracy: {2:.2f}%".format(ii + 1, cc, acc * 100))
# Predict
out = tf.nn.softmax(model)
probs = sess.run(out, feed_dict={input: newx})
# probs = sess.run(tf.argmax(model, 1), feed_dict={input: newx})
# print(probs)
if len(probs.shape) == 1:
probs = np.reshape(probs, (1, -1))
# Append the resulting class to the probabilities
res = np.append(probs, [self.classes[np.argmax(probs, 1)]], axis=1)
return res
def stack_series(all_data, series):
res = None
for ss in series:
tt = pd.concat([all_data[ss]['full']['entry'], all_data[ss]['features']], axis=1).dropna()
# tt['symbol'] = pd.Series(ss, index=tt.index)
tt.insert(0, 'symbol', pd.Series(ss, index=tt.index))
if res is None:
res = tt
else:
res = res.append(tt)
res = res.sort_index()
# res = res.sort(['ts','symbol'])
return res
def drive_wfl():
# symbols = ['HO2']
symbols = ['HO2', 'CL2']
all_data = dsh.load('all_data.bin')
combined = stack_series(all_data, symbols)
# print(combined['symbol'].tail(20))
symbol_column = None
if 'symbol' in combined.columns:
symbol_column = combined['symbol']
combined.drop('symbol', axis=1, inplace=True)
response = combined.iloc[:, 0]
features = combined.iloc[:, 1:]
fl = dsh.ForecastLocations(features.index)
ml = WalkForwardLoop('qda', 'ml.log', db_url='sqlite:///ml.sqlite')
ml.run(features, response, fl, symbol_column=symbol_column)
def extend_price(ts1, ts2):
first_index = np.where(ts2.index == ts1.index[0])
if len(first_index) != 1 or len(first_index[0]) != 1:
raise ('Failed to find the index to stich the series')
first_index = first_index[0][0]
res = ts2.pct_change().shift(-1)[:first_index].append(ts1)
# Walk the series backwards, building the prices from the returns
for ii in range(first_index, 0, -1):
res[ii - 1] = res[ii] / (1.0 + res[ii - 1])
return res
def pinnacle_csv(csv_path):
ss = pd.read_csv(csv_path, header=None, parse_dates=True, index_col=0)
ss = ss.ix[:, 0:4]
ss.columns = ['open', 'high', 'low', 'close']
return ss
def returns_wfl():
ho_pin = pinnacle_csv('d:/DATA/CLCDATA/HO_REV.CSV')
ho_pin = ho_pin.ix[:, 3]
db = idb.CsiDb()
ho_csi = db.load_bars('HO2')
ho_csi = ho_csi.ix[:, 3]
ho_ext = np.round(extend_price(ho_csi, ho_pin), 4)
rets = ho_ext.pct_change()
erets = rets.pow(2).ewm(span=36).mean().pow(1 / 2)
arets = rets / erets
arets = arets.dropna()
history_len = 3 * 252 # Three years
nrows = len(arets) - history_len
mm = np.full((nrows, history_len), np.nan)
for ii in range(history_len, len(arets)):
mm[ii - history_len, :] = arets[(ii - history_len + 1):(ii + 1)]
response = np.where(arets < 0, -1, 1)
response = pd.DataFrame(response, index=arets.index)
# Remove the first history_len + 1. The extra one removed is
# because we need to shift the features one position forward,
# to align with the response, thus, we loose one more feature.
response = response.tail(-history_len - 1)
features = mm[:(mm.shape[0] - 1), :]
features = pd.DataFrame(features, index=response.index)
# print(response.head())
# print(arets.head())
# print(features.head().iloc[:,-3:])
# print(response.tail())
# print(arets.tail())
# print(features.tail().iloc[:,-3:])
fl = dsh.ForecastLocations(features.index, start_date="2011-12-31")
ml = WalkForwardLoop('tf_conv', log_file='ml.log', db_url='sqlite:///ml.sqlite')
ml.run(features, response, fl, verbose=False, tensorflow=True)
def main():
# Init logging
logging.basicConfig(filename='diag.log',level=logging.DEBUG)
returns_wfl()
if __name__ == "__main__":
main() # Save results to a database or somewhere else
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment