Skip to content

Instantly share code, notes, and snippets.

@uneasyguy
Last active March 4, 2019 00:14
Show Gist options
  • Save uneasyguy/f8a6f1a647d796187f431c9bad98d1bc to your computer and use it in GitHub Desktop.
Save uneasyguy/f8a6f1a647d796187f431c9bad98d1bc to your computer and use it in GitHub Desktop.
import time
import dateparser
import pytz
import json
import csv
import datetime
from dateutil.rrule import rrule, MONTHLY
from binance.client import Client
import os
import sys
pathname = os.path.dirname(sys.argv[0])
full_path = '{}/'.format(str(os.path.abspath(pathname)))
def grab_base_currency():
base_currency_preference = input('Which base currency would you like to grab data for?( BTC, ETH, BNB, USDT, PAX or TUSD) ')
if base_currency_preference.upper() in ['BTC','BNB','ETH','USDT','PAX','TUSD']:
return base_currency_preference.upper()
else:
print ('Invalid base currency, please try again.')
base_currency_preference = input('Which base currency would you like to grab data for?( BTC, ETH, BNB, USDT, PAX or TUSD) ')
if base_currency_preference.upper() in ['BTC','BNB','ETH','USDT','PAX','TUSD']:
return base_currency_preference.upper()
else:
print ('Invalid entry. Please rerun script')
exit()
def grab_quote_currency():
quote_currency_preference = input('Which quote currency would you like to grab data for? ')
return quote_currency_preference.upper()
def grab_kline_interval():
kline_interval = input('What Kline Interal would you prefer? Options: 1m,3m,5m,15m,30m,1h,2h,4h,6h,8h,12h ')
if kline_interval in ['1m','3m','5m','15m','30m','1h','2h','4h','6h','8h','12h']:
return kline_interval
else:
print ('{} is an invalid option, please try again'.format(str(kline_interval)))
kline_interval_2 = input('What Kline Interal would you prefer? Options: 1m,3m,5m,15m,30m,1h,2h,4h,6h,8h,12h ')
if kline_interval_2 in ['1m','3m','5m','15m','30m','1h','2h','4h','6h','8h','12h']:
return kline_interval_2
def grab_date_interval():
print ('What date range would you like to pull data from?\nIn MM/YYYY format,except you can enter now for end date to get most recent.')
start_date_input = input('Start date: ')
start_date_split = start_date_input.split('/')
month = int(start_date_split[0])
day = 1
year = int(start_date_split[1])
start_date = datetime.date(year,month,day)
end_date_input = input('End date: ')
if end_date_input.lower() in ['now']:
end_date = datetime.datetime.now()
else:
end_date_split = end_date_input.split('/')
month = int(end_date_split[0])
year = int(end_date_split[1])
if month in [1,3,5,7,8,10,12]:
day = 31
elif month in [4,6,9,11]:
day = 30
elif month in [2] and year not in [2020,2024,2028,2032,2036,2040]:
day = 28
else:
day = 29
end_date = datetime.date(year,month,day)
return start_date,end_date
def create_directories(symbol):
main_directory_finder = [x[0] for x in os.walk(full_path)]
historical_price_data_directory = '{}historical_price_data'.format(str(full_path))
if historical_price_data_directory not in main_directory_finder:
os.makedirs(historical_price_data_directory)
pair_directory_finder = [x[0] for x in os.walk('{}historical_price_data/'.format(str(full_path)))]
if '{}/{}'.format(str(historical_price_data_directory),str(symbol)) not in pair_directory_finder:
os.makedirs('{}historical_price_data/{}'.format(str(full_path),str(symbol)))
def date_to_milliseconds(date_str):
epoch = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=pytz.utc)
d = dateparser.parse(date_str)
if d.tzinfo is None or d.tzinfo.utcoffset(d) is None:
d = d.replace(tzinfo=pytz.utc)
return int((d - epoch).total_seconds() * 1000.0)
def interval_to_milliseconds(interval):
ms = None
seconds_per_unit = {'m': 60,'h': 60 * 60,'d': 24 * 60 * 60,'w': 7 * 24 * 60 * 60}
unit = interval[-1]
if unit in seconds_per_unit:
try:
ms = int(interval[:-1]) * seconds_per_unit[unit] * 1000
except ValueError:
pass
return ms
def get_historical_klines(symbol, interval, start_str, end_str=None):
client = Client(None, None)
output_data = []
limit = 500
timeframe = interval_to_milliseconds(interval)
start_ts = date_to_milliseconds(start_str)
end_ts = None
if end_str:
end_ts = date_to_milliseconds(end_str)
idx = 0
symbol_existed = False
while True:
try:
temp_data = client.get_klines(symbol=symbol,interval=interval,limit=limit,startTime=start_ts,endTime=end_ts)
if not symbol_existed and len(temp_data):
symbol_existed = True
if symbol_existed:
output_data += temp_data
start_ts = temp_data[len(temp_data) - 1][0] + timeframe
else:
start_ts += timeframe
idx += 1
except IndexError:
idx +=1
if len(temp_data) < limit:
break
if idx % 3 == 0:
time.sleep(1)
return output_data
def grab_data(symbol,interval,start_date,end_date,dates):
for x in range(0,len(dates)):
date_splitter = str(dates[x]).split('-')
year = date_splitter[0]
numerical_month = date_splitter[1]
month_abbreviation_dict = {'01':'Jan','02':'Feb','03':'Mar','04':'Apr','05':'May','06':'Jun','07':'Jul','08':'Aug','09':'Sept','10':'Oct','11':'Nov','12':'Dec'}
for num,abbr in month_abbreviation_dict.items():
if numerical_month==num:
calendar_month = abbr
klines_date = '{}, {}'.format(str(calendar_month),str(year))
csv_month = '{}-{}-'.format(str(year),str(numerical_month))
if numerical_month in ['01','03','05','07','08','10','12']:
day = range(1,32)
elif numerical_month in ['04','06','09','11']:
day = range(1,31)
elif numerical_month in ['02'] and year not in ['2020','2024','2028','2032','2036','2040','2044']:
day = range(1,29)
else:
day = range(1,30)
for y in day:
next_day = y+1
start = '{} {}'.format(str(y),str(klines_date))
if y == day[-1]:
next_day = 1
if int(numerical_month) in range(1,12):
next_numerical_month = int(numerical_month)+1
else:
next_numerical_month = 1
year = int(year)+1
next_numerical_month_abbreviation_dict = {'1':'Jan','2':'Feb','3':'Mar','4':'Apr','5':'May','6':'Jun','7':'Jul','8':'Aug','9':'Sept','10':'Oct','11':'Nov','12':'Dec'}
for key,val in next_numerical_month_abbreviation_dict.items():
if str(next_numerical_month)==key:
calendar_month = val
klines_date = '{}, {}'.format(str(calendar_month),str(year))
end = '{} {}'.format(str(next_day),str(klines_date))
print ('symbol {} start {} end {}'.format(str(symbol),str(start),str(end)))
klines = get_historical_klines(symbol, interval, start, end)
if len(klines)>0:
titles = ('Date','Open','High','Low','Close','Volume')
if y in range(1,10):
csv_day = '0{}'.format(str(y))
else:
csv_day = str(y)
with open('{}historical_price_data/{}/{}{}_{}.csv'.format(str(full_path),str(symbol),str(csv_month),str(csv_day),str(interval)), 'a') as f:
writer = csv.writer(f)
writer.writerow(titles)
history_list = list()
for x in range(0,(len(klines)-1)):
open_epoch_timestamp = float(klines[x][0])/1000
open_time = datetime.datetime.utcfromtimestamp(open_epoch_timestamp).strftime('%Y-%m-%d %H:%M:%S.%f')
open_price = klines[x][1]
high = klines[x][2]
low = klines[x][3]
close_price = klines[x][4]
volume = klines[x][5]
close_epoch_timestamp = float(klines[x][6])/1000
close_time = datetime.datetime.utcfromtimestamp(close_epoch_timestamp).strftime('%Y-%m-%d %H:%M:%S.%f')
quote_asset_volume = klines[x][7]
number_of_trades = klines[x][8]
taker_buy_base_asset_volume = klines[x][9]
taker_buy_quote_asset_volume = klines[x][10]
fields = (open_time,open_price,high,low,close_price,volume)
history_list.append(fields)
with open('{}historical_price_data/{}/{}{}_{}.csv'.format(str(full_path),str(symbol),str(csv_month),str(csv_day),str(interval)), 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
def main():
base_currency_to_grab = grab_base_currency()
quote_currency_to_grab = grab_quote_currency()
symbol = '{}{}'.format(str(quote_currency_to_grab),str(base_currency_to_grab))
interval = grab_kline_interval()
create_needed_directories = create_directories(symbol)
start_date,end_date = grab_date_interval()
dates = [date for date in rrule(MONTHLY, dtstart=start_date, until=end_date)]
grab_data(symbol,interval,start_date,end_date,dates)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment