Last active
November 17, 2020 14:31
-
-
Save whittlem/0bdf4d9dfe88ac71c757c96786708aa3 to your computer and use it in GitHub Desktop.
Trading using Python — Moving Average Convergence Divergence (MACD)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://whittle.medium.com/trading-using-python-moving-average-convergence-divergence-macd-4dbfee1c1a37 | |
import re | |
import requests | |
from datetime import datetime | |
def cbpGetHistoricRates(market='BTC-GBP', granularity=86400, iso8601start='', iso8601end=''): | |
if not isinstance(market, str): | |
raise Exception('Market string input expected') | |
if not isinstance(granularity, int): | |
raise Exception('Granularity integer input expected') | |
granularity_options = [60, 300, 900, 3600, 21600, 86400] | |
if not granularity in granularity_options: | |
raise Exception( | |
'Invalid granularity: 60, 300, 900, 3600, 21600, 86400') | |
if not isinstance(iso8601start, str): | |
raise Exception('ISO8601 date string input expected') | |
if not isinstance(iso8601end, str): | |
raise Exception('ISO8601 date string input expected') | |
# iso8601 regex | |
regex = r'^(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\.[0-9]+)?(Z|[+-](?:2[0-3]|[01][0-9]):[0-5][0-9])?$' | |
if len(iso8601start) < 0: | |
match_iso8601 = re.compile(regex).match | |
if match_iso8601(iso8601start) is None: | |
raise Exception('iso8601 start date is invalid') | |
if len(iso8601end) < 0: | |
match_iso8601 = re.compile(regex).match | |
if match_iso8601(iso8601end) is None: | |
raise Exception('iso8601 end date is invalid') | |
api = 'https://api.pro.coinbase.com/products/' + market + '/candles?granularity=' + \ | |
str(granularity) + '&start=' + iso8601start + '&end=' + iso8601end | |
resp = requests.get(api) | |
if resp.status_code != 200: | |
raise Exception('GET ' + api + ' {}'.format(resp.status_code)) | |
data = {} | |
for price in reversed(resp.json()): | |
# time, low, high, open, close, volume | |
iso8601 = datetime.fromtimestamp(price[0]) | |
timestamp = datetime.strftime(iso8601, "%d/%m/%Y %H:%M:%S") | |
data[timestamp] = price[4] | |
return data | |
# data: dictionary { 'dd/mm/yyy': price, 'dd/mm/yyyy': price, ... } | |
# num: range in the average calculation, normally 9 to 26 | |
def exponentialMovingAverage(data, num): | |
if not isinstance(data, dict): | |
raise Exception('Dictionary input expected') | |
if not isinstance(num, int): | |
raise Exception('Integer input expected') | |
if num < 9 or num > 26: | |
raise Exception('Unusual numeric input detected') | |
if (num > len(data)): | |
raise Exception('Insufficient data for calculation') | |
data_keys = list(data.keys()) | |
data_list = list(data.values()) | |
last_sma = -1 | |
result = {} | |
for x in range(len(data_list) - num + 1): | |
series = data_list[x:x + num] | |
if (last_sma == -1): | |
result[data_keys[x + num - 1]] = round((sum(series) / num), 2) | |
else: | |
current_price = data[data_keys[x + num - 1]] | |
result[data_keys[x + num - 1]] = round( | |
current_price * 2 / (num + 1) + last_sma * (1 - 2 / (num + 1)), 2) | |
last_sma = result[data_keys[x + num - 1]] | |
return result | |
def movingAverageConvergenceDivergence(data): | |
if not isinstance(data, dict): | |
raise Exception('Dictionary input expected') | |
if (26 > len(data)): | |
raise Exception('Insufficient data for calculation') | |
ema12_data = exponentialMovingAverage(data, 12) | |
ema26_data = exponentialMovingAverage(data, 26) | |
macd_data = {} | |
data_keys = list(data.keys()) | |
for key in data_keys: | |
ema12 = 0 | |
if key in ema12_data: | |
ema12 = ema12_data[key] | |
ema26 = 0 | |
if key in ema26_data: | |
ema26 = ema26_data[key] | |
if (ema12 > 0) and (ema26 == 0): | |
macd_data[key] = 0 | |
else: | |
macd_data[key] = ema12 - ema26 | |
signal_data = exponentialMovingAverage(macd_data, 9) | |
result = {} | |
for key in data_keys: | |
price = 0 | |
if key in data: | |
price = data[key] | |
ema12 = 0 | |
if key in ema12_data: | |
ema12 = ema12_data[key] | |
ema26 = 0 | |
if key in ema26_data: | |
ema26 = ema26_data[key] | |
macd = 0 | |
if key in macd_data: | |
macd = macd_data[key] | |
signal = 0 | |
if key in signal_data: | |
signal = signal_data[key] | |
result[key] = { | |
'price': price, | |
'ema12': ema12, | |
'ema26': ema26, | |
'macd': macd, | |
'signal': signal | |
} | |
return result | |
data = cbpGetHistoricRates('BTC-GBP', 86400) | |
macd = movingAverageConvergenceDivergence(data) | |
def csvResults(data): | |
print('date,price,ema12,ema26,macd,signal') | |
for key in list(data.keys()): | |
if data[key]['ema26'] != 0: | |
print(key + ',' + str(data[key]['price']) + ',' + str(data[key]['ema12']) + ',' + str( | |
data[key]['ema26']) + ',' + str(round(data[key]['macd'], 2)) + ',' + str(data[key]['signal'])) | |
csvResults(macd) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment