Skip to content

Instantly share code, notes, and snippets.

@whittlem
Last active November 17, 2020 14:31
Show Gist options
  • Save whittlem/0bdf4d9dfe88ac71c757c96786708aa3 to your computer and use it in GitHub Desktop.
Save whittlem/0bdf4d9dfe88ac71c757c96786708aa3 to your computer and use it in GitHub Desktop.
Trading using Python — Moving Average Convergence Divergence (MACD)
# https://whittle.medium.com/trading-using-python-moving-average-convergence-divergence-macd-4dbfee1c1a37
import re
import requests
from datetime import datetime
def cbpGetHistoricRates(market='BTC-GBP', granularity=86400, iso8601start='', iso8601end=''):
if not isinstance(market, str):
raise Exception('Market string input expected')
if not isinstance(granularity, int):
raise Exception('Granularity integer input expected')
granularity_options = [60, 300, 900, 3600, 21600, 86400]
if not granularity in granularity_options:
raise Exception(
'Invalid granularity: 60, 300, 900, 3600, 21600, 86400')
if not isinstance(iso8601start, str):
raise Exception('ISO8601 date string input expected')
if not isinstance(iso8601end, str):
raise Exception('ISO8601 date string input expected')
# iso8601 regex
regex = r'^(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\.[0-9]+)?(Z|[+-](?:2[0-3]|[01][0-9]):[0-5][0-9])?$'
if len(iso8601start) < 0:
match_iso8601 = re.compile(regex).match
if match_iso8601(iso8601start) is None:
raise Exception('iso8601 start date is invalid')
if len(iso8601end) < 0:
match_iso8601 = re.compile(regex).match
if match_iso8601(iso8601end) is None:
raise Exception('iso8601 end date is invalid')
api = 'https://api.pro.coinbase.com/products/' + market + '/candles?granularity=' + \
str(granularity) + '&start=' + iso8601start + '&end=' + iso8601end
resp = requests.get(api)
if resp.status_code != 200:
raise Exception('GET ' + api + ' {}'.format(resp.status_code))
data = {}
for price in reversed(resp.json()):
# time, low, high, open, close, volume
iso8601 = datetime.fromtimestamp(price[0])
timestamp = datetime.strftime(iso8601, "%d/%m/%Y %H:%M:%S")
data[timestamp] = price[4]
return data
# data: dictionary { 'dd/mm/yyy': price, 'dd/mm/yyyy': price, ... }
# num: range in the average calculation, normally 9 to 26
def exponentialMovingAverage(data, num):
if not isinstance(data, dict):
raise Exception('Dictionary input expected')
if not isinstance(num, int):
raise Exception('Integer input expected')
if num < 9 or num > 26:
raise Exception('Unusual numeric input detected')
if (num > len(data)):
raise Exception('Insufficient data for calculation')
data_keys = list(data.keys())
data_list = list(data.values())
last_sma = -1
result = {}
for x in range(len(data_list) - num + 1):
series = data_list[x:x + num]
if (last_sma == -1):
result[data_keys[x + num - 1]] = round((sum(series) / num), 2)
else:
current_price = data[data_keys[x + num - 1]]
result[data_keys[x + num - 1]] = round(
current_price * 2 / (num + 1) + last_sma * (1 - 2 / (num + 1)), 2)
last_sma = result[data_keys[x + num - 1]]
return result
def movingAverageConvergenceDivergence(data):
if not isinstance(data, dict):
raise Exception('Dictionary input expected')
if (26 > len(data)):
raise Exception('Insufficient data for calculation')
ema12_data = exponentialMovingAverage(data, 12)
ema26_data = exponentialMovingAverage(data, 26)
macd_data = {}
data_keys = list(data.keys())
for key in data_keys:
ema12 = 0
if key in ema12_data:
ema12 = ema12_data[key]
ema26 = 0
if key in ema26_data:
ema26 = ema26_data[key]
if (ema12 > 0) and (ema26 == 0):
macd_data[key] = 0
else:
macd_data[key] = ema12 - ema26
signal_data = exponentialMovingAverage(macd_data, 9)
result = {}
for key in data_keys:
price = 0
if key in data:
price = data[key]
ema12 = 0
if key in ema12_data:
ema12 = ema12_data[key]
ema26 = 0
if key in ema26_data:
ema26 = ema26_data[key]
macd = 0
if key in macd_data:
macd = macd_data[key]
signal = 0
if key in signal_data:
signal = signal_data[key]
result[key] = {
'price': price,
'ema12': ema12,
'ema26': ema26,
'macd': macd,
'signal': signal
}
return result
data = cbpGetHistoricRates('BTC-GBP', 86400)
macd = movingAverageConvergenceDivergence(data)
def csvResults(data):
print('date,price,ema12,ema26,macd,signal')
for key in list(data.keys()):
if data[key]['ema26'] != 0:
print(key + ',' + str(data[key]['price']) + ',' + str(data[key]['ema12']) + ',' + str(
data[key]['ema26']) + ',' + str(round(data[key]['macd'], 2)) + ',' + str(data[key]['signal']))
csvResults(macd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment