Skip to content

Instantly share code, notes, and snippets.

@va3093
Created June 1, 2022 07:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save va3093/e9565c17710aa641636654a2c07c26c6 to your computer and use it in GitHub Desktop.
Save va3093/e9565c17710aa641636654a2c07c26c6 to your computer and use it in GitHub Desktop.
import csv
import datetime
import gzip
import logging
import os
import requests
import simplejson as json
import sys
import time
from typing import List
logging.basicConfig()
_logger: logging.Logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)
API_URL = "https://core-hsr.dune.com/v1/graphql"
QUERY_ID = 283432 # 283763
HEADERS = {"x-hasura-api-key": "<get_it_from_kms>"}
CSV_COLUMNS = [
"trade_date",
"project",
"version",
"category",
"token_a_symbol",
"token_b_symbol",
"exchange_contract_address",
"token_a_amount",
"token_b_amount",
"usd_amount",
"trades",
"max_trade_usd_amount",
]
def execute_with_retry(url, query, variables, headers={}, maxRetry=5):
curRetry = 1
while curRetry <= maxRetry:
if curRetry == maxRetry:
raise Exception("Reached max retry. Exit now.")
try:
req = requests.post(url, json={"query": query, "variables": variables}, headers=headers)
result = json.loads(req.text)
if "data" in result:
return result["data"]
else:
return result
except Exception as e:
print(sys.exc_info()[0])
print(e)
print(f"Error retry {curRetry}/{maxRetry}")
curRetry += 1
time.sleep(5)
return None
def download_dune_query_to_csv(query_id: int, start_date: str, end_date: str, csv_columns: List, output_dir: str):
"""
The GraphQL logic is based on https://gist.github.com/mewwts/7ec78380018ed0581d45172761d546ba
SELECT
date(block_time) trade_date
, project
, version
, category
, token_a_symbol
, token_b_symbol
, encode(exchange_contract_address, 'hex') as exchange_contract_address
, SUM(token_a_amount) as token_a_amount
, SUM(token_b_amount) as token_b_amount
, SUM(usd_amount) as usd_amount
, COUNT(*) as trades
, MAX(usd_amount) as max_trade_usd_amount
FROM dex."trades" t
WHERE category = 'DEX'
AND block_time >= '{{start_date}}'::date
AND block_time < '{{end_date}}'::date
GROUP BY 1,2,3,4,5,6,7
"""
gquery_get_result = """
query GetResult ($query_id: Int!, $parameters: [Parameter!]){
get_result_v2(query_id: $query_id, parameters: $parameters) {
job_id
result_id
error_id
__typename
}
}
"""
dex_volume_params = {
"query_id": query_id,
"parameters": [
{"key": "start_date", "type": "datetime", "value": f"{start_date}"},
{"key": "end_date", "type": "datetime", "value": f"{end_date}"},
],
}
gquery_get_result_by_result_id = """
query FindResultDataByResult($result_id: uuid!){
query_results(where: {id: {_eq: $result_id}}) {
id
job_id
error
runtime
generated_at
columns
__typename
}
get_result_by_result_id(args: {want_result_id: $result_id}) {
data
__typename
}
}
"""
# main logic:
result = None
while result is None or "get_result_v2" not in result or result["get_result_v2"]["result_id"] is None:
result = execute_with_retry(API_URL, gquery_get_result, dex_volume_params)
_logger.info(result)
if "errors" in result:
raise RuntimeError(result["errors"][0])
time.sleep(5)
result_id = result["get_result_v2"]["result_id"]
error_id = result["get_result_v2"].get("error_id")
if error_id is not None:
# When the most recent execution of a query returns an error get_result_v2 will return
# the query_error.id along with the most recent succesful result.
_logger.warn(f"The most recent execution resulted in an error with id: {error_id}")
_logger.info(f"Waiting for RESULT_ID: {result_id}")
get_result_by_result_id_params = {"result_id": result_id}
result = execute_with_retry(
url=API_URL,
query=gquery_get_result_by_result_id,
variables=get_result_by_result_id_params,
headers=HEADERS,
)
record_count = len(result["get_result_by_result_id"])
OUTPUT_ARRAY = []
for data in result["get_result_by_result_id"]:
OUTPUT_ARRAY.append(data["data"])
tag = start_date[0:7].replace("-", "")
csv_file = f"{output_dir}/dex_daily_volume.month={tag}.csv.gz"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
try:
with gzip.open(csv_file, "wt") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=CSV_COLUMNS)
writer.writeheader()
for data in OUTPUT_ARRAY:
writer.writerow(data)
except Exception as e:
raise (e)
_logger.info(f"{csv_file} downloaded with {record_count:>12} records.")
if __name__ == "__main__":
# generate [start_date, end_date) list
start_date = "2021-12-01"
end_date = "2022-02-01"
all_query_dates = []
s = datetime.datetime.strptime(start_date, "%Y-%m-%d").replace(day=1)
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d")
while s < end_date:
e = (s + datetime.timedelta(days=32)).replace(day=1)
all_query_dates.append(s.strftime("%Y-%m-%d 00:00:00") + "|" + e.strftime("%Y-%m-%d 00:00:00"))
s = e
all_query_dates.sort(reverse=True)
i = 0
for pair in all_query_dates:
(start_date, end_date) = pair.split("|")
_logger.info(f"{i:>4} Trying [{start_date}, {end_date}):")
download_dune_query_to_csv(
query_id=QUERY_ID,
start_date=start_date,
end_date=end_date,
csv_columns=CSV_COLUMNS,
output_dir="/var/tmp/dune",
)
i += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment