Skip to content

Instantly share code, notes, and snippets.

@charlesdarkwind
Created November 3, 2022 13:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save charlesdarkwind/3fc5d12f89bce93b43b0b2554a6ad9f3 to your computer and use it in GitHub Desktop.
Save charlesdarkwind/3fc5d12f89bce93b43b0b2554a6ad9f3 to your computer and use it in GitHub Desktop.
binance2.py
# Python 3.9
import asyncio
import hashlib
import aiohttp
from aiofile import async_open, AIOFile, Reader
BASE_URL = 'https://data.binance.vision'
def get_file_name(symbol, interval, year, month, ext):
return f'{symbol}-{interval}-{year}-{month}.{ext}'
async def request(session: aiohttp.ClientSession, symbol, interval, file_name):
url = f'{BASE_URL}/data/spot/monthly/klines/{symbol}/{interval}/' + file_name
checksum_url = url + '.CHECKSUM'
res = await session.get(url)
if res.status == 404 and 'NoSuchKey' in await res.text():
return 'NoSuchKey', None
if res.status != 200:
raise Exception(await res.text())
checksum_res = await session.get(checksum_url)
if checksum_res.status != 200:
raise Exception(await checksum_res.text())
checksum = (await checksum_res.text()).split(' ')[0]
return res, checksum
async def write_zip(zip_data, path):
async with async_open(path, 'wb') as f:
await f.write(zip_data)
async def generate_checksum(file_path):
sha256_hash = hashlib.sha256()
async with AIOFile(file_path, 'rb') as f:
reader = Reader(f)
async for chunk in reader:
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
async def verify_checksum(file_path, expected, generated):
if expected != generated:
raise ValueError(
f'Wrong checksum for file {file_path}. expected {expected}, got {generated}.')
async def main():
async with aiohttp.ClientSession() as session:
symbol, interval, year, month = 'LTCBTC', '1m', '2022', '10'
file_name = get_file_name(symbol, interval, year, month, 'zip')
res, checksum = await request(session, symbol, interval, file_name)
if res == 'NoSuchKey':
print(f'No data')
print(f'Found data')
zip_data = await res.read()
await write_zip(zip_data, './data.zip')
generated_checksum = await generate_checksum('./data.zip')
await verify_checksum('./data.zip', checksum, generated_checksum)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment