Skip to content

Instantly share code, notes, and snippets.

@shion24hub
Last active March 8, 2024 19:25
Show Gist options
  • Save shion24hub/98e7adc902985bc8936c10fced1bee56 to your computer and use it in GitHub Desktop.
Save shion24hub/98e7adc902985bc8936c10fced1bee56 to your computer and use it in GitHub Desktop.
import argparse
import asyncio
import gzip
import os
import shutil
import tempfile
import time
from datetime import datetime
from io import BytesIO
import aiohttp
import numpy as np
import pandas as pd
class App:
base_url = "https://public.bybit.com/trading"
def __init__(self) -> None:
# parser
self._setup_parser()
self.args = self.parser.parse_args()
# args
self.symbol = self.args.symbol
self.start = self.args.start
self.end = self.args.end
self.interval = self.args.interval
self.output_dir = self.args.output_dir
self.log_dir = self.args.log_dir
self.start_dt = datetime.strptime(self.start, "%Y%m%d")
self.end_dt = datetime.strptime(self.end, "%Y%m%d")
self.daterange = pd.date_range(self.start_dt, self.end_dt, freq="D")
self.chunked_daterange = [
self.daterange[i : i + 30] for i in range(0, len(self.daterange), 30)
]
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(self.log_dir, exist_ok=True)
def _setup_parser(self) -> None:
""" setup parser
"""
self.parser = argparse.ArgumentParser()
self.parser.add_argument("symbol", type=str, help="trading symbol")
self.parser.add_argument("start", type=str, help="start date in YYYYMMDD")
self.parser.add_argument("end", type=str, help="end date in YYYYMMDD")
self.parser.add_argument("interval", type=int, help="candle interval in seconds")
self.parser.add_argument("-o", "--output_dir", type=str, default="./", help="output directory (default: ./)")
self.parser.add_argument("-l", "--log_dir", type=str, default="./", help="log directory (default: ./)",)
def _convert_to_candle(self, df: pd.DataFrame, interval: str):
"""
convert trading data to ohlcv data.
required columns of df: ['datetime', 'side', 'size', 'price']
df:
- datetime(pd.datetime64[ns]): timestamp of the trade
- side(str): 'Buy' or 'Sell'
- size(float): size of the trade
- price(float): price of the trade
"""
df = df[["datetime", "side", "size", "price"]]
df.loc[:, ["buySize"]] = np.where(df["side"] == "Buy", df["size"], 0)
df.loc[:, ["sellSize"]] = np.where(df["side"] == "Sell", df["size"], 0)
df.loc[:, ["datetime"]] = df["datetime"].dt.floor(interval)
df = df.groupby("datetime").agg(
{
"price": ["first", "max", "min", "last"],
"size": "sum",
"buySize": "sum",
"sellSize": "sum",
}
)
# multiindex to single index
df.columns = ["_".join(col) for col in df.columns]
df = df.rename(
columns={
"price_first": "open",
"price_max": "high",
"price_min": "low",
"price_last": "close",
"size_sum": "volume",
"buySize_sum": "buyVolume",
"sellSize_sum": "sellVolume",
}
)
return df
async def _get_as_bytes(self, session: aiohttp.ClientSession, url: str, date: datetime) -> tuple[datetime, bytes]:
""" get data as bytes
"""
async with session.get(url) as response:
data = await response.read()
return (date, data)
async def _download(self, daterange: pd.DatetimeIndex) -> list[tuple[datetime, bytes]]:
""" download
download trading historical data from bybit (https://public.bybit.com/trading/).
Asynchronously GET requests csv.gz data and returns a list of its datetime and byte tuples.
"""
async with aiohttp.ClientSession() as session:
tasks = []
for date in daterange:
filename = f'{self.symbol}{date.strftime("%Y-%m-%d")}.csv.gz'
url = os.path.join(self.base_url, self.symbol, filename)
tasks.append(self._get_as_bytes(session, url, date))
results = await asyncio.gather(*tasks)
return results
async def run(self):
start_time = time.time()
with tempfile.TemporaryDirectory() as temp_dir:
for daterange in self.chunked_daterange:
results = await self._download(daterange)
for dt, data in results:
with gzip.open(BytesIO(data), "rt") as f:
df = pd.read_csv(f)
df.loc[:, ["datetime"]] = pd.to_datetime(df["timestamp"], unit="s")
df = self._convert_to_candle(df, f"{self.interval}s")
df.to_csv(os.path.join(
temp_dir, f"{dt.strftime('%Y-%m-%d')}.csv"
))
archive_path = os.path.join(self.output_dir, self.symbol)
shutil.make_archive(archive_path, "zip", temp_dir)
print(f"Time: {time.time() - start_time}")
if __name__ == "__main__":
app = App()
asyncio.run(app.run())

Description

Fetch the Bybit exchange execution data that can be obtained from "https://public.bybit.com/trading" and compose the N-second candlestick data. The candlestick data is saved in csv on a daily basis and zipped together.

Simple example

To download 1-minute (60-second) candlestick data for BTCUSDT from 1 Jan 2024 to 15 Jan 2024:

python3 adl.py BTCUSDT 20240101 20240115 60

If you want to specify the output directory, you can use output_dir or o option.

python3 adl.py BTCUSDT 20240101 20240115 60 --output_dir outs/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment