Skip to content

Instantly share code, notes, and snippets.

@yhilpisch
Last active November 23, 2024 19:17
Show Gist options
  • Save yhilpisch/5dfcfbf3ba53e1d8dec8bf8fae4a9404 to your computer and use it in GitHub Desktop.
Save yhilpisch/5dfcfbf3ba53e1d8dec8bf8fae4a9404 to your computer and use it in GitHub Desktop.

AI-Powered Algorithmic Trading with Python

ODSC London 2023 Half-Day Training

Dr. Yves J. Hilpisch
CEO The Python Quants | The AI Machine
Adjunct Professor of Computational Finance

London, 15. June 2023

Short Link

http://bit.ly/odsc_ldn_2023

Slides

You find the slides under https://certificate.tpq.io/odsc_ldn_2023.pdf

Blog Post

Read the blog post that gives you a quick overview and introduction: https://opendatascience.com/ai-powered-algorithmic-trading-with-python/

Abstract

This half-day trading session covers several important Python topics and skills to apply AI and Machine Learning (ML) to Algorithmic Trading. The session shows how to make use of the Oanda trading API (via a demo account) to retrieve data, stream data, place orders, etc. Building on this, a ML-based trading strategy is formulated and backtested. Finally, the trading strategy is transformed into an online trading algorithm and is deployed for real-time trading on the Oanda trading platform.

Session Outline

  1. Module: Setting up the Python and Oanda (paper) trading infrastructure
  2. Module: Financial data logistics and backtesting of an ML-based algorithmic trading strategy
  3. Module: Deployment of the ML-based algorithmic trading strategy in real-time

Background knowledge needed

Basic knowledge of Python and data science packages, such as NumPy, pandas, and matplotlib.

Python Mastery in Finance Program

Certificate Program in Python for Finance: https://certificate.tpq.io

Python for Finance Basics Bootcamp: https://youtube.com/@dyjh

Financial Theory with Python

Our newest book about Financial Theory with Python (https://home.tpq.io/books/ftwp/).

Sign up under https://finpy.pqp.io to access all the Jupyter Notebooks and execute them on our Quant Platform.

Python for Algorithmic Trading

Our recent book about Python for Algorithmic Trading (https://py4at.tpq.io).

Sign up under https://py4at.pqp.io to access all the Jupyter Notebooks and execute them on our Quant Platform.

Artificial Intelligence in Finance

Our recent book about Artificial Intelligence in Finance (https://aiif.tpq.io).

Sign up under https://aiif.pqp.io to access all the Jupyter Notebooks and execute them on our Quant Platform.

Python for Finance (2nd ed.)

Our standard reference book about Python for Finance (http://py4fi.tpq.io).

Sign up under https://py4fi.pqp.io to access all the Jupyter Notebooks and execute them on our Quant Platform.

Further Resources

Python

If you have either Miniconda or Anaconda already installed, there is no need to install anything new.

Otherwise, you might want to install Miniconda for your operating system: https://conda.io/en/master/miniconda.html

Read more about the management of environments under: https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-environments.html

Cloud

Use this link to get a 200 USD free credit for 60 days on DigitalOcean when signing up for a new account.

Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
#
# Example Code for Algorithmic Strategy Deployment
# on Oanda (https://oanda.com)
#
# (c) Dr. Yves J. Hilpisch
# The Python Quants GmbH
#
# The code is for illustration purposes only. No warranties or representations
# to the extent permitted by applicable law. The code does not
# represent investment advice or a recommendation in any regard.
#
import re
import math
import uuid
import numpy as np
import pandas as pd
from time import sleep
from tpqoa import tpqoa
from retrying import retry
from dateutil import parser
from abc import abstractmethod, ABCMeta
from datetime import timedelta, datetime
from model import Signal, Prediction, signal_queue
valid_instruments = ['EUR_USD', 'BCO_USD']
valid_frequency = ['M1', 'M5', 'M10', 'M30']
def roundup(x, freq):
return int(math.ceil(x / freq)) * freq
def round_time(dt=None, date_delta=timedelta(minutes=1), to='average'):
"""
Round a datetime object to a multiple of a timedelta
dt: datetime.datetime object, default now.
dateDelta: timedelta object, we round to a multiple of this,
default 1 minute.
from: http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
"""
round_to = date_delta.total_seconds()
if dt is None:
dt = datetime.utcnow()
seconds = (dt - timedelta(dt.minute)).second
if to == 'up':
rounding = (seconds + round_to) // round_to * round_to
elif to == 'down':
rounding = seconds // round_to * round_to
else:
rounding = (seconds + round_to / 2) // round_to * round_to
dt = dt + timedelta(seconds=(rounding - seconds),
microseconds=-dt.microsecond)
return dt
class BaseStrategy(metaclass=ABCMeta):
def __init__(self, model_parameters, config):
self.data_source = tpqoa(config)
self.data = pd.DataFrame()
self.live_price_data = pd.DataFrame()
self.model_params = model_parameters
self.n_bars = 120
self.first_run = True
self.stop_model = False
self.model_id = uuid.uuid4()
self.signal_count = 0
self.feature_labels = set()
# Important model params
self.trading_quantity = None
self.instrument = None
self.frequency = None
self.initialize()
self.validate()
##################################################################
# PRIVATE METHODS APPLICABLE FOR ALL STRATEGIES
##################################################################
def _initialize_model_params(self):
model_parameters = self.model_params
for key, value in model_parameters.items():
setattr(self, key, value)
@staticmethod
def _get_time_unit_and_duration(freq):
freq = re.findall(r'[A-Za-z]+|\d+', freq)
min_or_sec = freq[0]
duration = int(freq[1])
return duration, min_or_sec
@retry(stop_max_attempt_number=7, wait_fixed=5000, wrap_exception=True)
def _get_data(self, instrument, start, end, freq=None, price='M'):
if freq is None:
freq = self.frequency
msg = f"Trying to get data from OANDA for {instrument} {start} {end}"
msg += f" {freq} {price} at {datetime.utcnow()}"
print(msg)
start = parser.parse(start).strftime("%Y-%m-%d %H:%M:%S")
end = parser.parse(end).strftime("%Y-%m-%d %H:%M:%S")
raw_data = self.data_source.get_history(
instrument, start, end, freq, price)
return raw_data
@staticmethod
def _sleep_for_signal_gen(duration, signal_date):
current_min = parser.parse(signal_date).minute
current_second = parser.parse(signal_date).second
next_min_level = roundup(current_min, duration)
seconds_to_sleep = (((next_min_level - current_min) * 60) + 1 -
current_second)
if seconds_to_sleep > 0:
print(f'signal gen thread: sleeping for {seconds_to_sleep} seconds')
sleep(seconds_to_sleep)
@staticmethod
def _sleep_until_next_signal(duration, min_or_sec, signal_date):
time_diff = (parser.parse(signal_date) -
parser.parse(datetime.utcnow().isoformat() + 'Z'))
seconds_diff = time_diff.seconds
microseconds_diff = time_diff.microseconds
# Sleep till the next min
sleep_duration = duration
if min_or_sec == 'M':
sleep_duration = 60 * duration
if seconds_diff < sleep_duration:
msg = f'signal gen thread: sleeping for '
msg += f'{seconds_diff + microseconds_diff / 1000000} seconds'
print(msg)
sleep(seconds_diff + microseconds_diff / 1000000)
def _publish_stop_signal(self):
signal = Signal()
signal.signal_id = uuid.uuid4()
signal.model_id = self.model_id
signal.instrument = self.instrument
signal.prediction = Prediction.STOP
self._publish_signal(signal)
@staticmethod
def _publish_signal(signal):
print(f'Publishing Signal: {signal.signal_id}')
signal_queue.put(signal)
def _prepare_predict_data(self, original_signal_date):
predict_data = pd.DataFrame()
predict_data[self.instrument + '_close'] = self.live_price_data['c']
predict_data[self.instrument + '_open'] = self.live_price_data['o']
predict_data[self.instrument + '_high'] = self.live_price_data['h']
predict_data[self.instrument + '_low'] = self.live_price_data['l']
predict_data[self.instrument + '_volume'] = self.live_price_data['volume']
predict_data[self.instrument + '_date'] = self.live_price_data['time']
predict_data[self.instrument + '_return'] = \
np.log(predict_data[self.instrument + '_close'] / \
predict_data[self.instrument + '_close'].shift(1))
predict_data.dropna(inplace=True)
predict_data.set_index(self.instrument + '_date', inplace=True)
predict_data.loc[parser.parse(original_signal_date)] = 100
return predict_data
def _get_signal_for_prediction(self, prediction):
signal = Signal()
signal.signal_id = uuid.uuid4()
signal.model_id = self.model_id
signal.instrument = self.instrument
signal.prediction = prediction
signal.quantity = self.trading_quantity
return signal
##################################################################
# PUBLIC METHODS THAT CAN BE OVERRIDDEN IN THE ACTUAL STRATEGY
##################################################################
def set_n_bars(self, n_bars):
# Override the number of candles to be fetched from data source.
self.n_bars = n_bars
def initialize(self):
self._initialize_model_params()
def validate(self):
instrument = self.model_params['instrument']
if instrument not in valid_instruments:
exit(f'{instrument} is not a valid/supported instruments')
self.instrument = instrument
frequency = self.model_params['frequency']
if frequency not in valid_frequency:
exit(f'{frequency} is not a valid/supported frequency')
self.frequency = frequency
if 'trading_quantity' not in self.model_params:
exit(f'trading quantity is mandatory')
else:
self.trading_quantity = self.model_params['trading_quantity']
if ('n_signals_to_gen' not in self.model_params) \
and ('stop_time' not in self.model_params):
exit('stop_time or n_signals_to_gen required as exit condition')
def generate_signal(self):
signal_date = datetime.utcnow().isoformat()[:-7] + 'Z'
duration, min_or_sec = self._get_time_unit_and_duration(self.frequency)
if self.first_run is True and 'trade_immediately' in self.model_params and \
self.model_params['trade_immediately'] is True:
self.first_run = False
else:
self.first_run = False
if min_or_sec == 'M':
self._sleep_for_signal_gen(duration, signal_date)
signal_date = datetime.utcnow().isoformat()[:-7] + 'Z'
print(f"generating signal now {datetime.utcnow()}")
while True:
try:
self.check_for_stop_condition(signal_date)
if self.stop_model is True:
self._publish_stop_signal()
break
if min_or_sec == 'M':
signal_date = round_time(parser.parse(signal_date),
date_delta=timedelta(minutes=duration),
to='up').isoformat()[:-6] + 'Z'
signal = self.predict_for_time(signal_date)
self.signal_count += 1
self._publish_signal(signal)
if min_or_sec == 'M':
self._sleep_for_signal_gen(duration, signal_date)
sleep(2)
self._sleep_until_next_signal(duration, min_or_sec, signal_date)
signal_date = datetime.utcnow().isoformat()[:-7] + 'Z'
except Exception as e:
import traceback
print(f'{traceback.format_exc()}')
def check_for_stop_condition(self, signal_time):
if 'n_signals_to_gen' in self.model_params:
if self.signal_count >= self.model_params['n_signals_to_gen']:
self.stop_model = True
if 'stop_time' in self.model_params:
stop_time = parser.parse(
parser.parse(self.model_params['stop_time']
).strftime("%Y-%m-%dT%H:%M:%SZ"))
if stop_time <= signal_time:
self.stop_model = True
def predict_for_time(self, signal_date=None, is_first_run=False):
signal_date = signal_date[:-1]
original_signal_date = signal_date
signal_date = parser.parse(signal_date)
# * 3 is to avoid the lags being NaN
time_periods_to_populate = self.n_bars
start = self.get_starting_time(signal_date, time_periods_to_populate)
raw_data = self._get_data(self.instrument,
start.strftime("%Y-%m-%dT%H:%M:%SZ"),
datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
freq=self.frequency, price='M')
raw_data_len = len(raw_data)
time_diff = (signal_date - start).seconds / 60
duration, min_or_sec = self._get_time_unit_and_duration(self.frequency)
retry_count = 0
while raw_data_len < time_diff / duration:
sleep(2)
if retry_count > 6:
print("Expected candles are {} got {}; stopping model.".format(
str(int(time_diff / duration)), str(raw_data_len)))
self.stop_model = True
break
raw_data = self._get_data(self.instrument,
start.strftime("%Y-%m-%dT%H:%M:%SZ"),
datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
freq=self.frequency, price='M')
raw_data_len = len(raw_data)
retry_count += 1
print("Expected candles are {} got {}.".format(
str(int(time_diff / duration)), str(raw_data_len)))
if self.stop_model is True:
return
raw_data.dropna(inplace=True)
self.live_price_data = raw_data.reset_index()
predict_data = self._prepare_predict_data(original_signal_date)
self.custom_data_preparation(predict_data, False)
prediction = self.on_signal(predict_data, signal_date)
signal = self._get_signal_for_prediction(prediction)
return signal
def get_starting_time(self, signal_date, delta):
duration, min_or_sec = self._get_time_unit_and_duration(self.frequency)
if 'D' in self.frequency:
return_date = signal_date - timedelta(days=delta * duration)
elif 'M' in self.frequency:
return_date = signal_date - timedelta(minutes=delta * duration)
elif 'S' in self.frequency:
return_date = signal_date - timedelta(seconds=delta * duration * 2)
else:
raise Exception(self.frequency + ' is not supported')
return return_date
@abstractmethod
def custom_data_preparation(self, data, is_train_date):
pass
@abstractmethod
def on_signal(self, predicted_data, signal_date):
pass
#
# Example Code for Algorithmic Strategy Deployment
# on Oanda (https://oanda.com)
#
# (c) Dr. Yves J. Hilpisch
# The Python Quants GmbH
#
# The code is for illustration purposes only. No warranties or representations
# to the extent permitted by applicable law. The code does not
# represent investment advice or a recommendation in any regard.
#
import threading
from model import signal_queue
from processor import SignalProcessor
from one import one # imports the trading strategy
model_parameters = dict()
model_parameters['instrument'] = 'EUR_USD'
model_parameters['frequency'] = 'M1'
model_parameters['trading_quantity'] = 10000
model_parameters['n_signals_to_gen'] = 10
model_parameters['sma'] = 10
model_parameters['trade_immediately'] = True
if __name__ == '__main__':
conf_file = '../oanda.cfg'
threading.Thread(target=SignalProcessor(conf_file).listen_to_signal,
daemon=True).start()
strategy = one(model_parameters, '../oanda.cfg')
strategy.generate_signal()
signal_queue.join()
#
# Example Code for Algorithmic Strategy Deployment
# on Oanda (https://oanda.com)
#
# (c) Dr. Yves J. Hilpisch
# The Python Quants GmbH
#
# The code is for illustration purposes only. No warranties or representations
# to the extent permitted by applicable law. The code does not
# represent investment advice or a recommendation in any regard.
#
import threading
from model import signal_queue
from processor import SignalProcessor
from sma import sma # imports the trading strategy
model_parameters = dict()
model_parameters['instrument'] = 'EUR_USD'
model_parameters['frequency'] = 'M1'
model_parameters['trading_quantity'] = 10000
model_parameters['n_signals_to_gen'] = 10
model_parameters['sma1'] = 3
model_parameters['sma2'] = 10
model_parameters['trade_immediately'] = True
if __name__ == '__main__':
conf_file = '../oanda.cfg'
threading.Thread(target=SignalProcessor(conf_file).listen_to_signal,
daemon=True).start()
strategy = sma(model_parameters, '../oanda.cfg')
strategy.generate_signal()
signal_queue.join()
#
# Example Code for Algorithmic Strategy Deployment
# on Oanda (https://oanda.com)
#
# (c) Dr. Yves J. Hilpisch
# The Python Quants GmbH
#
# The code is for illustration purposes only. No warranties or representations
# to the extent permitted by applicable law. The code does not
# represent investment advice or a recommendation in any regard.
#
import queue
from enum import Enum
signal_queue = queue.Queue()
class Prediction(Enum):
LONG = 1
SHORT = 2
NEUTRAL = 3
STOP = 4
class Signal:
def __init__(self):
self.model_id = None
self.signal_id = None
self.instrument = None
self.prediction_time = None
self.prediction = None
self.quantity = None
def __repr__(self):
return str(self.__dict__)
class SignalProcessingException(Exception):
pass
[oanda]
access_token = c71f59_YOURACCESSTOKEN_13da5b8f60fac1397232
account_id = 101-004-YOURID-001
account_type = practice
#
# Example Code for Algorithmic Strategy Deployment
# on Oanda (https://oanda.com)
#
# (c) Dr. Yves J. Hilpisch
# The Python Quants GmbH
#
# The code is for illustration purposes only. No warranties or representations
# to the extent permitted by applicable law. The code does not
# represent investment advice or a recommendation in any regard.
#
''' This is a trading strategy template for an algorithmic trading strategy based
on technical indicators that can be defined flexibly.
'''
import numpy as np
from model import Prediction
from base import BaseStrategy
class one(BaseStrategy):
"""
This is the set of default model parameters.
Override and add where applicable.
"""
def __init__(self, model_parameters, config):
super().__init__(model_parameters, config)
if model_parameters['sma'] > self.n_bars:
self.n_bars = model_parameters['sma'] * 3
def custom_data_preparation(self, data, is_training_data):
"""
Add required data preparations here.
"""
prediction = self.instrument + '_prediction'
data['sma'] = (data[self.instrument + '_close'].rolling(
self.sma).mean().shift(1))
data.dropna(inplace=True)
price = data[self.instrument + '_close'].shift(1)
data[prediction] = np.where(price > data['sma'], 1, -1)
def on_signal(self, predicted_data, signal_date):
"""
This method is called every time the strategy generates a signal.
"""
direction = predicted_data.loc[signal_date][
self.instrument + '_prediction']
if direction == -1:
prediction = Prediction.SHORT
else:
prediction = Prediction.LONG
return prediction
#
# Example Code for Algorithmic Strategy Deployment
# on Oanda (https://oanda.com)
#
# (c) Dr. Yves J. Hilpisch
# The Python Quants GmbH
#
# The code is for illustration purposes only. No warranties or representations
# to the extent permitted by applicable law. The code does not
# represent investment advice or a recommendation in any regard.
#
from enum import Enum
from tpqoa import tpqoa
from threading import RLock
from collections import defaultdict
from model import signal_queue, Prediction, SignalProcessingException
class StateAttrDef:
QUANTITY = 'quantity'
PRICE = 'price'
REAL_PNL = 'real_pnl'
class SignalProcessingType(Enum):
GO_LONG = 1
GO_SHORT = 2
GO_LONG_FROM_SHORT = 3
GO_SHORT_FROM_LONG = 4
STOP = 5
NOOP = 6
class SignalProcessor:
def __init__(self, conf_file='../oanda.cfg'):
self.state = defaultdict(self._get_empty_state)
self.oanda = tpqoa(conf_file)
self.lock = RLock()
@staticmethod
def _get_empty_state():
empty_state = dict()
empty_state[StateAttrDef.QUANTITY] = 0.0
empty_state[StateAttrDef.PRICE] = 0.0
empty_state[StateAttrDef.REAL_PNL] = 0.0
return empty_state
def listen_to_signal(self):
while True:
try:
signal = signal_queue.get()
print(f'Processing signal: {signal.signal_id}')
self.process_signal(signal)
except Exception as e:
import traceback
print(f'{traceback.format_exc()}')
def process_signal(self, signal):
with self.lock:
current_state = self.state[signal.instrument]
print(f'Start processing signal {signal}.\nState={current_state}')
signum_value = self._get_signum(current_state[StateAttrDef.QUANTITY])
processing_type = self._get_processing_type(
signum_value, signal.prediction)
print(f'Processing type: {processing_type}')
state = self.place_trades_for_signal(
signal, processing_type, current_state)
print(f'Processed signal: {signal}.\nState={state}')
def place_trades_for_signal(self, signal, processing_type, current_state):
if SignalProcessingType.NOOP == processing_type:
return current_state
if processing_type in [SignalProcessingType.GO_LONG,
SignalProcessingType.GO_LONG_FROM_SHORT]:
net_qty = signal.quantity + abs(current_state[StateAttrDef.QUANTITY])
elif processing_type in [SignalProcessingType.GO_SHORT,
SignalProcessingType.GO_SHORT_FROM_LONG]:
net_qty = ((-1 * signal.quantity) -
abs(current_state[StateAttrDef.QUANTITY]))
elif processing_type == SignalProcessingType.STOP:
net_qty = -1 * current_state[StateAttrDef.QUANTITY]
else:
msg = f'Invalid processing type {processing_type} encountered.'
raise SignalProcessingException(msg)
order_response = self._place_trade_for_ins(signal.instrument, net_qty)
updated_state = self._update_state(order_response, current_state)
return updated_state
@staticmethod
def _update_state(order_response, state):
if 'tradesClosed' in order_response:
for closed_trd in order_response['tradesClosed']:
state[StateAttrDef.QUANTITY] += float(closed_trd['units'])
state[StateAttrDef.REAL_PNL] += float(closed_trd['realizedPL'])
if abs(state[StateAttrDef.QUANTITY]) <= 0.0001:
state[StateAttrDef.PRICE] = 0.0
if 'tradeOpened' in order_response:
open_trade = order_response['tradeOpened']
state[StateAttrDef.QUANTITY] += float(open_trade['units'])
state[StateAttrDef.PRICE] += float(open_trade['price'])
return state
def _place_trade_for_ins(self, instrument, qty):
response = self.oanda.create_order(
instrument, qty, ret=True, suppress=True)
# if type is not ORDER_FILL, there is some problem wih the order placement.
if response['type'] != 'ORDER_FILL':
raise SignalProcessingException(f'Error creating order: {response}.')
return response
@staticmethod
def _get_processing_type(signum_value, prediction):
if Prediction.STOP == prediction:
return SignalProcessingType.STOP
if signum_value == 0:
return SignalProcessingType.GO_LONG if prediction == Prediction.LONG \
else SignalProcessingType.GO_SHORT
if signum_value == 1:
return SignalProcessingType.GO_SHORT_FROM_LONG \
if prediction == Prediction.SHORT \
else SignalProcessingType.NOOP
if signum_value == -1:
return SignalProcessingType.GO_LONG_FROM_SHORT \
if prediction == Prediction.LONG \
else SignalProcessingType.NOOP
@staticmethod
def _get_signum(x):
if x == 0:
return 0
else:
return 1 if x > 0 else -1
#
# Example Code for Algorithmic Strategy Deployment
# on Oanda (https://oanda.com)
#
# (c) Dr. Yves J. Hilpisch
# The Python Quants GmbH
#
# The code is for illustration purposes only. No warranties or representations
# to the extent permitted by applicable law. The code does not
# represent investment advice or a recommendation in any regard.
#
''' This is a trading strategy template for an algorithmic trading strategy based
on technical indicators that can be defined flexibly.
'''
import numpy as np
from model import Prediction
from base import BaseStrategy
class sma(BaseStrategy):
"""
This is the set of default model parameters.
Override and add where applicable.
"""
def __init__(self, model_parameters, config):
super().__init__(model_parameters, config)
if model_parameters['sma1'] or model_parameters['sma2'] > self.n_bars:
if model_parameters['sma1'] > model_parameters['sma2']:
self.n_bars = model_parameters['sma1'] * 3
else:
self.n_bars = model_parameters['sma2'] * 3
def custom_data_preparation(self, data, is_training_data):
"""
Add required data preparations here.
"""
prediction = self.instrument + '_prediction'
data['sma1'] = (data[self.instrument + '_close'].rolling(
self.sma1).mean().shift(1))
data['sma2'] = (data[self.instrument + '_close'].rolling(
self.sma2).mean().shift(1))
data.dropna(inplace=True)
data[prediction] = np.where(data['sma1'] > data['sma2'], 1, -1)
def on_signal(self, predicted_data, signal_date):
"""
This method is called every time the strategy generates a signal.
"""
direction = predicted_data.loc[signal_date][
self.instrument + '_prediction']
if direction == -1:
prediction = Prediction.SHORT
else:
prediction = Prediction.LONG
return prediction
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment