Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save benyblack/77bbe3561e1686879aa976ad9f53f085 to your computer and use it in GitHub Desktop.
Save benyblack/77bbe3561e1686879aa976ad9f53f085 to your computer and use it in GitHub Desktop.
import time
import dateparser
import pytz
import json
import csv
import datetime
from dateutil.rrule import rrule, MONTHLY
from binance.client import Client
import os
import sys
import shutil
import multiprocessing as mp
from itertools import repeat as re
pathname = os.path.dirname(sys.argv[0])
platform = sys.platform
if platform == 'win32':
full_path = '{}\\'.format(str(os.path.abspath(pathname)))
else:
full_path = '{}/'.format(str(os.path.abspath(pathname)))
def one_or_many():
data_pull_quantity = input('Would you like to pull data for one currency pair or many? (answer: 1 or many) ')
if data_pull_quantity.lower() in ['1','one']:
return '1'
else:
return '2'
def grab_base_currency():
base_currency_preference = input('Which base currency would you like to grab data for?( BTC, ETH, BNB, USDT, PAX, TUSD, XRP, or USDC) ')
return base_currency_preference.upper()
def grab_quote_currency():
quote_currency_preference = input('Which quote currency would you like to grab data for? ')
return quote_currency_preference.upper()
def grab_currencies_list():
base_currencies_list = list()
currencies_list = list()
info = Client(None,None)
pair_query = info.get_all_tickers()
list_preference = input('Which base currencies would you like to grab data for?(eg. BTC or ETH,BNB or BTC,ETH,BNB,USDT) ')
split_preference = list_preference.split(',')
for x in split_preference:
base_currency = x.strip().upper()
base_currencies_list.append(base_currency)
for y in base_currencies_list:
base = y
for z in range(0,len(pair_query)):
symbol = pair_query[z]['symbol']
if symbol[-len(base):] == base:
currencies_list.append(symbol)
return currencies_list
def grab_date_interval():
print ('What date range would you like to pull data from?\nIn MM/DD/YYYY format,except you can enter now for end date to get most recent.')
start_date_input = input('Start date: ')
start_date_replaced = str(start_date_input).replace('.','/')
start_date_split = start_date_replaced.split('/')
start_date_month = int(start_date_split[0])
start_date_day = int(start_date_split[1])
start_date_year = int(start_date_split[2])
start_date = datetime.date(start_date_year,start_date_month,start_date_day)
binance_start_date = datetime.date(2017,7,1)
if start_date < binance_start_date:
start_date_input = input('Binance opened in July of 2017, please try a date later than 07/01/2017: ')
start_date_replaced = str(start_date_input).replace('.','/')
start_date_split = start_date_replaced.split('/')
start_date_month = int(start_date_split[0])
start_date_day = int(start_date_split[1])
start_date_year = int(start_date_split[2])
start_date = datetime.date(start_date_year,start_date_month,start_date_day)
if start_date < binance_start_date:
print ('Error, please restart and be sure to enter dates in MM/DD/YYYY format')
quit()
end_date_input = input('End date: ')
if end_date_input.lower() in ['now']:
end_date = datetime.datetime.now()
end_date_string = str(end_date)
end_date_string_split = end_date_string.split('-')
end_date_month = end_date_string_split[1]
end_date_day = end_date_string_split[2][:2]
end_date_year = end_date_string_split[0]
end_date_delta = datetime.date(int(end_date_year),int(end_date_month),int(end_date_day))
delta = end_date_delta-start_date
days_between = delta.days
else:
end_date_replaced = end_date_input.replace('.','/')
end_date_split = end_date_replaced.split('/')
end_date_month = int(end_date_split[0])
end_date_day = int(end_date_split[1])
end_date_year = int(end_date_split[2])
end_date = datetime.date(end_date_year,end_date_month,end_date_day)
delta = end_date-start_date
days_between = delta.days
return start_date,end_date,start_date_year,start_date_month,start_date_day,end_date_year,end_date_month,end_date_day,days_between
def grab_kline_interval():
kline_interval = input('What Kline Interal would you prefer? Options: 1m,3m,5m,15m,30m,1h,2h,4h,6h,8h,12h ')
if kline_interval in ['1m','3m','5m','15m','30m','1h','2h','4h','6h','8h','12h']:
return kline_interval
else:
print ('{} is an invalid option, please try again'.format(str(kline_interval)))
kline_interval_2 = input('What Kline Interal would you prefer? Options: 1m,3m,5m,15m,30m,1h,2h,4h,6h,8h,12h ')
if kline_interval_2 in ['1m','3m','5m','15m','30m','1h','2h','4h','6h','8h','12h']:
return kline_interval_2
def create_directories(pair_list,kline_interval,start_date,end_date):
if len(str(end_date))>12:
today = datetime.datetime.today()
end_date = today.strftime('%Y-%m-%d')
main_directory_finder = [x[0] for x in os.walk(full_path)]
historical_price_data_directory = '{}historical_price_data'.format(str(full_path))
if historical_price_data_directory not in main_directory_finder:
os.makedirs(historical_price_data_directory)
historical_price_data_directory_finder = [x[0] for x in os.walk(full_path)]
kline_interval_directory_finder = [x[0] for x in os.walk(historical_price_data_directory)]
if platform == 'win32':
kline_interval_directory = '{}\\{}_{}_{}'.format(str(historical_price_data_directory),str(start_date),str(end_date),str(kline_interval))
else:
kline_interval_directory = '{}/{}_{}_{}'.format(str(historical_price_data_directory),str(start_date),str(end_date),str(kline_interval))
if kline_interval_directory not in historical_price_data_directory_finder:
os.makedirs(kline_interval_directory)
pair_directory_finder = [x[0] for x in os.walk('{}'.format(str(kline_interval_directory)))]
for x in range(0,len(pair_list)):
symbol = pair_list[x]
if platform == 'win32':
pair_directory = '{}\\{}'.format(str(kline_interval_directory),str(symbol))
else:
pair_directory = '{}/{}'.format(str(kline_interval_directory),str(symbol))
if pair_directory not in pair_directory_finder:
os.makedirs(pair_directory)
return kline_interval_directory
def date_to_milliseconds(date_str):
epoch = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=pytz.utc)
d = dateparser.parse(date_str)
if d.tzinfo is None or d.tzinfo.utcoffset(d) is None:
d = d.replace(tzinfo=pytz.utc)
return int((d - epoch).total_seconds() * 1000.0)
def interval_to_milliseconds(interval):
ms = None
seconds_per_unit = {'m': 60,'h': 60 * 60,'d': 24 * 60 * 60,'w': 7 * 24 * 60 * 60}
unit = interval[-1]
if unit in seconds_per_unit:
try:
ms = int(interval[:-1]) * seconds_per_unit[unit] * 1000
except ValueError:
pass
return ms
def get_historical_klines(symbol, interval, start_str, end_str=None):
client = Client(None, None)
output_data = []
limit = 1000
timeframe = interval_to_milliseconds(interval)
start_ts = date_to_milliseconds(start_str)
end_ts = None
if end_str:
end_ts = date_to_milliseconds(end_str)
idx = 0
symbol_existed = False
while True:
try:
temp_data = client.get_klines(symbol=symbol,interval=interval,limit=limit,startTime=start_ts,endTime=end_ts)
if not symbol_existed and len(temp_data):
symbol_existed = True
if symbol_existed:
output_data += temp_data
start_ts = temp_data[len(temp_data) - 1][0] + timeframe
else:
start_ts += timeframe
idx += 1
except Exception as e:
print (str(e))
idx+=1
if len(temp_data) < limit:
break
return output_data
def process_dates(start_date,end_date,start_date_day,start_date_month,start_date_year,end_date_day,end_date_month,end_date_year,days_between):
if str(start_date_month) in ['1','3','5','7','8','10','12'] and days_between >31:
if int(start_date_day)>int(end_date_day):
end_date_formed = datetime.date(int(end_date_year),int(end_date_month),31)
else:
end_date_formed = end_date
dates =[date for date in rrule(MONTHLY,bymonthday=start_date_day, dtstart=start_date, until=end_date_formed)]
elif str(start_date_month) in ['4','6','9','11'] and days_between>30:
if int(start_date_day)>int(end_date_day):
end_date_formed = datetime.date(int(end_date_year),int(end_date_month),30)
else:
end_date_formed = end_date
dates =[date for date in rrule(MONTHLY,bymonthday=start_date_day, dtstart=start_date, until=end_date_formed)]
elif str(start_date_month) in ['2'] and str(start_date_year) not in ['2020','2024','2028','2032','2036','2040','2044'] and days_between>28:
if int(start_date_day)>int(end_date_day):
end_date_formed = datetime.date(int(end_date_year),int(end_date_month),28)
else:
end_date_formed = end_date
dates =[date for date in rrule(MONTHLY,bymonthday=start_date_day, dtstart=start_date, until=end_date_formed)]
elif str(start_date_month) in ['2'] and days_between>29:
if int(start_date_day)>int(end_date_day):
end_date_formed = datetime.date(int(end_date_year),int(end_date_month),29)
else:
end_date_formed = end_date
dates =[date for date in rrule(MONTHLY,bymonthday=start_date_day, dtstart=start_date, until=end_date_formed)]
elif str(end_date_month) != '12' and str(start_date_month)!=str(end_date_month):
end_date_month = int(end_date_month)+1
end_date = datetime.date(int(end_date_year),int(end_date_month),int(end_date_day))
dates =[date for date in rrule(MONTHLY,bymonthday=start_date_day, dtstart=start_date, until=end_date)]
elif str(start_date_month) != str(end_date_month):
end_date_month = '1'
end_date_year = int(end_date_year)+1
end_date = datetime.date(int(end_date_year),int(end_date_month),int(end_date_day))
dates =[date for date in rrule(MONTHLY,bymonthday=start_date_day, dtstart=start_date, until=end_date)]
else:
dates =[date for date in rrule(MONTHLY,bymonthday=start_date_day, dtstart=start_date, until=end_date)]
return dates
def grab_data(pair,start_date,end_date,start_date_year,start_date_month,start_date_day,end_date_year,end_date_month,end_date_day,days_between,dates,kline_interval_directory,interval,L):
symbol = pair
if platform == 'win32':
partial_path = '{}\\{}\\'.format(str(kline_interval_directory),str(symbol))
else:
partial_path = '{}/{}/'.format(str(kline_interval_directory),str(symbol))
for x in range(0,len(dates)):
date_splitter = str(dates[x]).split('-')
year = date_splitter[0]
numerical_month = date_splitter[1]
month_abbreviation_dict = {'01':'Jan','02':'Feb','03':'Mar','04':'Apr','05':'May','06':'Jun','07':'Jul','08':'Aug','09':'Sept','10':'Oct','11':'Nov','12':'Dec'}
for num,abbr in month_abbreviation_dict.items():
if numerical_month==num:
calendar_month = abbr
klines_date = '{}, {}'.format(str(calendar_month),str(year))
csv_month = '{}-{}-'.format(str(year),str(numerical_month))
if int(end_date_month)<10:
end_date_month_formatted = '0{}'.format(str(end_date_month))
else:
end_date_month_formatted = str(end_date_month)
if start_date_month<10:
start_date_month_formatted = '0{}'.format(str(start_date_month))
else:
start_date_month_formatted = start_date_month
if len(dates)==1 and start_date_month == end_date_month:
day = range(int(start_date_day),(int(end_date_day)+1))
elif len(dates)==1 and str(start_date_month) != end_date_month:
if str(numerical_month) in ['01','03','05','07','08','10','12']:
day = range(int(start_date_day),32)
elif numerical_month in ['04','06','09','11']:
day = range(int(start_date_day),31)
elif numerical_month in ['02'] and year not in ['2020','2024','2028','2032','2036','2040','2044']:
day = range(int(start_date_day),29)
else:
day = range(int(start_date_day),30)
elif str(numerical_month) == str(start_date_month_formatted) and str(year) == str(start_date_year):
if str(numerical_month) in ['01','03','05','07','08','10','12']:
day = range(int(start_date_day),32)
elif numerical_month in ['04','06','09','11']:
day = range(int(start_date_day),31)
elif numerical_month in ['02'] and year not in ['2020','2024','2028','2032','2036','2040','2044']:
day = range(int(start_date_day),29)
else:
day = range(int(start_date_day),30)
elif str(numerical_month) != end_date_month_formatted:
start_date_formatted = '{}/{}'.format(str(start_date_month_formatted),start_date_year)
end_date_formatted = '{}/{}'.format(str(end_date_month_formatted),str(end_date_year))
if str(start_date_formatted) =='{}/{}'.format(str(numerical_month),str(year)):
if str(start_date_month) in ['01','03','05','07','08','10','12']:
day = range(int(start_date_day),32)
elif numerical_month in ['04','06','09','11']:
day = range(int(start_date_day),31)
elif numerical_month in ['02'] and year not in ['2020','2024','2028','2032','2036','2040','2044']:
day = range(int(start_date_day),29)
else:
day = range(int(start_date_day),30)
elif str(end_date_formatted)!='{}/{}'.format(str(numerical_month),str(year)):
if numerical_month in ['01','03','05','07','08','10','12']:
day = range(1,32)
elif numerical_month in ['04','06','09','11']:
day = range(1,31)
elif numerical_month in ['02'] and year not in ['2020','2024','2028','2032','2036','2040','2044']:
day = range(1,29)
else:
day = range(1,30)
else:
day = range(1,(int(end_date_day)+1))
elif str(numerical_month) != str(end_date_month_formatted) and str(end_date_year)!=year:
if numerical_month in ['01','03','05','07','08','10','12']:
day = range(1,32)
elif numerical_month in ['04','06','09','11']:
day = range(1,31)
elif numerical_month in ['02'] and year not in ['2020','2024','2028','2032','2036','2040','2044']:
day = range(1,29)
else:
day = range(1,30)
else:
day = range(1,(int(end_date_day)+1))
for y in day:
next_day = y+1
start = '{} {}'.format(str(y),str(klines_date))
end_date_formatted = '{}/{}'.format(str(end_date_month_formatted),str(end_date_year))
if str(end_date_formatted)!='{}/{}'.format(str(numerical_month),str(year)):
if y == day[-1]:
next_day = 1
if int(numerical_month) in range(1,12):
next_numerical_month = int(numerical_month)+1
else:
next_numerical_month = 1
year = int(year)+1
next_numerical_month_abbreviation_dict = {'1':'Jan','2':'Feb','3':'Mar','4':'Apr','5':'May','6':'Jun','7':'Jul','8':'Aug','9':'Sept','10':'Oct','11':'Nov','12':'Dec'}
for key,val in next_numerical_month_abbreviation_dict.items():
if str(next_numerical_month)==key:
calendar_month = val
else:
final_day = day[-1]
if numerical_month in ['01','03','05','07','08','10','12'] and str(y) == str(final_day) and str(y) == '31':
next_day = 1
if int(numerical_month) in range(1,12):
next_numerical_month = int(numerical_month)+1
else:
next_numerical_month = 1
year = int(year)+1
next_numerical_month_abbreviation_dict = {'1':'Jan','2':'Feb','3':'Mar','4':'Apr','5':'May','6':'Jun','7':'Jul','8':'Aug','9':'Sept','10':'Oct','11':'Nov','12':'Dec'}
for key,val in next_numerical_month_abbreviation_dict.items():
if str(next_numerical_month)==key:
calendar_month = val
elif numerical_month in ['04','06','09','11'] and str(y) == str(final_day) and str(y) == '30':
next_day = 1
if int(numerical_month) in range(1,12):
next_numerical_month = int(numerical_month)+1
else:
next_numerical_month = 1
year = int(year)+1
next_numerical_month_abbreviation_dict = {'1':'Jan','2':'Feb','3':'Mar','4':'Apr','5':'May','6':'Jun','7':'Jul','8':'Aug','9':'Sept','10':'Oct','11':'Nov','12':'Dec'}
for key,val in next_numerical_month_abbreviation_dict.items():
if str(next_numerical_month)==key:
calendar_month = val
elif numerical_month in ['02'] and year not in ['2020','2024','2028','2032','2036','2040','2044'] and str(y) == str(final_day) and str(y) == '28':
next_day = 1
if int(numerical_month) in range(1,12):
next_numerical_month = int(numerical_month)+1
else:
next_numerical_month = 1
year = int(year)+1
next_numerical_month_abbreviation_dict = {'1':'Jan','2':'Feb','3':'Mar','4':'Apr','5':'May','6':'Jun','7':'Jul','8':'Aug','9':'Sept','10':'Oct','11':'Nov','12':'Dec'}
for key,val in next_numerical_month_abbreviation_dict.items():
if str(next_numerical_month)==key:
calendar_month = val
else:
if numerical_month in ['02'] and str(y) == str(final_day) and str(y) == '29':
next_day = 1
if int(numerical_month) in range(1,12):
next_numerical_month = int(numerical_month)+1
else:
next_numerical_month = 1
year = int(year)+1
next_numerical_month_abbreviation_dict = {'1':'Jan','2':'Feb','3':'Mar','4':'Apr','5':'May','6':'Jun','7':'Jul','8':'Aug','9':'Sept','10':'Oct','11':'Nov','12':'Dec'}
for key,val in next_numerical_month_abbreviation_dict.items():
if str(next_numerical_month)==key:
calendar_month = val
klines_date = '{}, {}'.format(str(calendar_month),str(year))
end = '{} {}'.format(str(next_day),str(klines_date))
print ('symbol {} start {} end {}'.format(str(symbol),str(start),str(end)))
klines = get_historical_klines(symbol, interval, start, end)
if len(klines)>0:
titles = ('Date','Open','High','Low','Close','Volume')
if y in range(1,10):
csv_day = '0{}'.format(str(y))
else:
csv_day = str(y)
if platform == 'win32':
results_csv = '{}{}{}_{}.csv'.format(str(partial_path),str(csv_month),str(csv_day),str(interval))
else:
results_csv = '{}{}{}_{}.csv'.format(str(partial_path),str(csv_month),str(csv_day),str(interval))
with open(results_csv, 'a') as f:
writer = csv.writer(f)
writer.writerow(titles)
history_list = list()
for x in range(0,(len(klines)-1)):
open_epoch_timestamp = float(klines[x][0])/1000
open_time = datetime.datetime.utcfromtimestamp(open_epoch_timestamp).strftime('%Y-%m-%d %H:%M:%S.%f')
open_price = klines[x][1]
high = klines[x][2]
low = klines[x][3]
close_price = klines[x][4]
volume = klines[x][5]
close_epoch_timestamp = float(klines[x][6])/1000
close_time = datetime.datetime.utcfromtimestamp(close_epoch_timestamp).strftime('%Y-%m-%d %H:%M:%S.%f')
quote_asset_volume = klines[x][7]
number_of_trades = klines[x][8]
taker_buy_base_asset_volume = klines[x][9]
taker_buy_quote_asset_volume = klines[x][10]
fields = (open_time,open_price,high,low,close_price,volume)
history_list.append(fields)
with open(results_csv, 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
file_into = symbol,partial_path,start_date,end_date,interval
L.append(file_into)
def concatenate_csvs(L):
for x in range(0,len(L)):
symbol = L[x][0]
partial_path = L[x][1]
start_date = L[x][2]
end_date = L[x][3]
interval = L[x][4]
if len(str(end_date))>12:
today = datetime.datetime.today()
end_date = today.strftime('%Y-%m-%d')
directory_finder = [x[0] for x in os.walk(partial_path)]
individual_csvs_directory = '{}individual_csvs'.format(str(partial_path))
if individual_csvs_directory not in directory_finder:
os.makedirs(individual_csvs_directory)
csv_files = [f for f in os.listdir('{}'.format(str(partial_path))) if os.path.isfile(os.path.join('{}'.format(str(partial_path)), f))]
csv_files = sorted(csv_files)
concat_csv = '{}_{}_{}_{}.csv'.format(str(symbol),str(start_date),str(end_date),str(interval))
if concat_csv in csv_files:
old_concat_csvs_path = '{}old_concatenated_csvs'.format(str(partial_path))
if old_concat_csvs_path not in directory_finder:
os.makedirs(old_concat_csvs_path)
shutil.move('{}{}'.format(str(partial_path),str(concat_csv)),'{}/{}'.format(str(old_concat_csvs_path),str(concat_csv)))
csv_files = [f for f in os.listdir('{}'.format(str(partial_path))) if os.path.isfile(os.path.join('{}'.format(str(partial_path)), f))]
csv_files = sorted(csv_files)
if len(csv_files)>0:
for x in range(0,len(csv_files)):
file = csv_files[x]
outpath = '{}{}'.format(str(partial_path),str(concat_csv))
fout=open(outpath,'a')
full_file_path = '{}{}'.format(str(partial_path),str(file))
writer = csv.writer(fout,lineterminator='\n')
if x == 0:
with open(full_file_path) as f:
for line in f:
if len(line)>1:
line_splitter = line.split(',')
timestamp = line_splitter[0]
open_ = line_splitter[1]
high = line_splitter[2]
low = line_splitter[3]
close_ = line_splitter[4]
volume = line_splitter[5].strip()
writer.writerow([timestamp,open_,high,low,close_,volume])
f.close()
fout.close()
else:
with open(full_file_path) as f:
f.__next__()
for line in f:
if len(line)>1:
line_splitter = line.split(',')
timestamp = line_splitter[0]
open_ = line_splitter[1]
high = line_splitter[2]
low = line_splitter[3]
close_ = line_splitter[4]
volume = line_splitter[5].strip()
writer.writerow([timestamp,open_,high,low,close_,volume])
f.close()
fout.close()
shutil.move(full_file_path,'{}/{}'.format(individual_csvs_directory,file))
if __name__ == '__main__':
manager = mp.Manager()
L = manager.list()
pull_quantity = one_or_many()
if pull_quantity == '1':
pair_list = list()
base_currency_to_grab = grab_base_currency()
quote_currency_to_grab = grab_quote_currency()
symbol = '{}{}'.format(str(quote_currency_to_grab),str(base_currency_to_grab))
pair_list.append(symbol)
else:
pair_list = grab_currencies_list()
interval = grab_kline_interval()
start_date,end_date,start_date_year,start_date_month,start_date_day,end_date_year,end_date_month,end_date_day,days_between = grab_date_interval()
kline_interval_directory = create_directories(pair_list,interval,start_date,end_date)
dates = process_dates(start_date,end_date,start_date_day,start_date_month,start_date_year,end_date_day,end_date_month,end_date_year,days_between)
pair = [pair_list[i] for i in range(0,len(pair_list))]
l=mp.Lock()
p=mp.Pool(processes=mp.cpu_count(),initargs=(l,))
data = p.starmap(grab_data,zip(pair,re(start_date),re(end_date),re(start_date_year),re(start_date_month),re(start_date_day),re(end_date_year),re(end_date_month),re(end_date_day),re(days_between),re(dates),re(kline_interval_directory),re(interval),re(L)))
p.close()
p.join()
L = list(set(L))
concatenate_csvs(L)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment