Skip to content

Instantly share code, notes, and snippets.

@vrbadev
Last active May 29, 2024 18:33
Show Gist options
  • Save vrbadev/8f808f86b6789ad6bab2582d44a168bc to your computer and use it in GitHub Desktop.
Save vrbadev/8f808f86b6789ad6bab2582d44a168bc to your computer and use it in GitHub Desktop.
Simple Python script which gathers all available Bitstamp OHLC data for selected trading pair and interval.
# -*- coding: utf-8 -*-
"""
Created on Sat Sep 23 14:04:42 2023
@author: Vojtech Vrba (vrbavoj3@fel.cvut.cz)
Simple Python script which gathers all available Bitstamp OHLC data for selected trading pair and interval.
The data is stored in a new CSV file / only new data is appended to the end of an existing CSV file.
"""
import datetime as dt
import os
import requests
import signal
import time
import tqdm
# Configuration
CURRENCY_PAIR = "btceur"
MAX_OHLC_ROWS_PER_REQUEST = 1000
OHLC_INTERVAL_SECONDS = 60
REQUEST_DELAY_SECONDS = 0.1
CSV_FILEPATH = "./bitstamp_ohlc_pair-%s_int-%d.csv" % (CURRENCY_PAIR, OHLC_INTERVAL_SECONDS)
CSV_COLUMNS_TYPES = {"timestamp": int, "open": float, "high": float, "low": float, "close": float, "volume": float}
# An efficient way to get only the last line from a large CSV file
def get_csv_tail(filepath, encoding="UTF-8"):
with open(filepath, "rb") as f:
header_line = f.readline().decode(encoding).strip()
try:
f.seek(-1, 2) # file end -1 byte
except:
return None, None
last_line_found = False
row_len = 0
while not last_line_found:
row_len = 0
try:
while f.read(1) != b'\n':
row_len += 1
f.seek(-2, 1) # curr pos -2 bytes
except IOError:
f.seek(-1, 1) # curr pos -1 byte
if f.tell() == 0:
break
if row_len >= 1:
last_line_found = True
f.seek(-2, 1) # curr pos -2 bytes
last_line = None
if last_line_found:
f.seek(1, 1) # curr pos +1 byte
last_line = f.read().decode(encoding).strip()
return header_line, last_line
# Request OHLC data from Bitstamp server using HTTP GET, parse as JSON
def get_ohlc_data(currency_pair, start_unix, num_values=1000, interval_sec=60):
url = "https://www.bitstamp.net/api/v2/ohlc/%s/" % (currency_pair)
data = requests.get(url, params={"step": interval_sec, "limit": num_values, "start": start_unix})
return data.json()["data"]["ohlc"]
# Binary search to find the starting timestamp of a consistent remote OHLC data block
def find_absolute_starting_unix(currency_pair, interval_sec=60):
now_timestamp = int(dt.datetime.now().timestamp())
test_timestamp = now_timestamp
step = test_timestamp // 2
while True:
response = get_ohlc_data(currency_pair, test_timestamp, num_values=2, interval_sec=interval_sec)
if len(response) == 0:
test_timestamp += step
elif len(response) == 2 or now_timestamp - int(response[0]["timestamp"]) <= 60:
test_timestamp -= step
else:
test_timestamp = int(response[0]["timestamp"])
break
step //= 2
return test_timestamp
# Pretty formatting of a UNIX timestamp
def format_unix_str(timestamp):
return dt.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M")
if __name__ == "__main__":
terminated = False
signal.signal(signal.SIGINT, lambda sig, frame: globals().update(terminated=True))
now = dt.datetime.now()
print("Checking Bitstamp data availability...")
start_unix = find_absolute_starting_unix(CURRENCY_PAIR, interval_sec=OHLC_INTERVAL_SECONDS)
print("Bistamp data for pair '%s' are available since timestamp %d (%s)" % (CURRENCY_PAIR, start_unix, format_unix_str(start_unix)))
if os.path.exists(CSV_FILEPATH):
header_line, last_line = get_csv_tail(CSV_FILEPATH)
if header_line == None:
print("Found existing CSV file, but it is empty - removing the file.")
os.remove(CSV_FILEPATH)
else:
last_row = {col_name: col_type(data) for (col_name, col_type), data in zip(CSV_COLUMNS_TYPES.items(), last_line.split(','))}
if last_line != None:
start_unix = int(last_row["timestamp"])
print("Found existing CSV file with last saved timestamp %d (%s)." % (start_unix, format_unix_str(start_unix)))
start_unix += OHLC_INTERVAL_SECONDS
else:
print("Found existing CSV file, but it contains only header line!")
if not os.path.exists(CSV_FILEPATH):
print("No existing CSV file found, creating a new one.")
with open(CSV_FILEPATH, "w") as f:
f.write(','.join(CSV_COLUMNS_TYPES.keys()) + '\n')
missing_values = (int(now.timestamp()) - start_unix) // OHLC_INTERVAL_SECONDS
if missing_values > 0:
print("Program will start downloading data starting with timestamp: %d (%s)" % (start_unix, format_unix_str(start_unix)))
print("Missing %d-second intervals upto now: %d" % (OHLC_INTERVAL_SECONDS, missing_values))
print("Running...")
last_unix = start_unix
with tqdm.tqdm(total=missing_values) as pbar:
while not terminated:
data = get_ohlc_data(CURRENCY_PAIR, start_unix, num_values=MAX_OHLC_ROWS_PER_REQUEST, interval_sec=OHLC_INTERVAL_SECONDS)
if len(data) == 0:
print("\nError: No data received!")
break
elif len(data) == 1 and last_unix >= int(data[-1]["timestamp"]):
print("\nReached the end of the currently available data!")
break
if len(data) != MAX_OHLC_ROWS_PER_REQUEST:
print("\nWarning: Got only %d/%d intervals of data!" % (len(data), MAX_OHLC_ROWS_PER_REQUEST))
new_lines = list()
for entry in data:
row = {c: t(entry[c]) for c, t in CSV_COLUMNS_TYPES.items()}
new_lines.append(','.join([str(v) for v in row.values()]) + '\n')
pbar.update(1)
last_unix = row["timestamp"]
start_unix = last_unix + OHLC_INTERVAL_SECONDS
with open(CSV_FILEPATH, "a") as f:
f.writelines(new_lines)
pbar.set_description("Last saved: %s (timestamp %d) | Processed" % (format_unix_str(last_unix), last_unix))
time.sleep(REQUEST_DELAY_SECONDS)
else:
print("No available data is missing from the CSV file.")
print("\nProgram done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment