Skip to content

Instantly share code, notes, and snippets.

@gingerwizard
Created March 15, 2023 18:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gingerwizard/319607a98e5d979d8ca43a6a125b54d1 to your computer and use it in GitHub Desktop.
Save gingerwizard/319607a98e5d979d8ca43a6a125b54d1 to your computer and use it in GitHub Desktop.
import logging
import sys
import time
import clickhouse_connect
from boto3 import Session
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
CLICKHOUSE_HOST = '<host>'
CLICKHOUSE_PASSWORD = '<password>'
CLICKHOUSE_TABLE = 'blocks'
TIME_COLUMN = 'timestamp'
MAX_WAIT_CYCLES = 100
COMMAND = f'SELECT number,hash,parent_hash,nonce,sha3_uncles,logs_bloom,transactions_root,state_root,receipts_root,' \
f'miner, difficulty,total_difficulty,size,extra_data,gas_limit,' \
f'gas_used,date_part(epoch, "timestamp")::integer as "timestamp", transaction_count,base_fee_per_gas ' \
f'FROM blocks'
REDSHIFT_DATABASE = 'dev'
REDSHIFT_CLUSTER_IDENTIFIER = 'redshift-cluster-1'
REDSHIFT_USER = 'awsuser'
def paginate_results(client, query_id, max_items=1000):
paginator = client.get_paginator('get_statement_result')
response = paginator.paginate(
Id=query_id,
PaginationConfig={
'MaxItems': max_items
}
).build_full_result()
yield [column['name'] for column in response['ColumnMetadata']], [[list(value.values())[0] for value in row]
for row in response['Records']]
while ('NextToken' in response):
response = paginator.paginate(
Id=query_id,
PaginationConfig={
'MaxItems': max_items,
'StartingToken': response['NextToken']
}).build_full_result()
yield [column['name'] for column in response['ColumnMetadata']], [[list(value.values())[0] for value in row] for
row in response['Records']]
def identify_max_time(client, table, time_column):
response = client.query(f'SELECT max({time_column}) FROM {table}')
return response.first_row[0]
def read_results(cluster_identifier, database, user, command, max_cycles=100):
new_session = Session()
redshift_client = new_session.client('redshift-data')
response = redshift_client.execute_statement(
Database=database,
Sql=command,
StatementName='row_export',
WithEvent=False,
ClusterIdentifier=cluster_identifier,
DbUser=user,
)
query_id = response['Id']
attempts = 0
done = False
while attempts < max_cycles and not done:
attempts += 1
time.sleep(1)
desc = redshift_client.describe_statement(Id=query_id)
query_status = desc['Status']
if query_status == 'FAILED':
logger.error(f'SQL query failed: {query_id} : {desc["Error"]}')
raise Exception(f'SQL query failed: {query_id} : {desc["Error"]}')
elif query_status == 'FINISHED':
done = True
for column_names, rows in paginate_results(redshift_client, query_id, max_items=1000):
yield column_names, rows
else:
logger.info('waiting for result')
if done == False and attempts >= MAX_WAIT_CYCLES:
logger.error(
'Limit for MAX_WAIT_CYCLES has been reached before the query was able to finish. We have exited out of the '
'while-loop. You may increase the limit accordingly. \n')
raise Exception(f'query status is: {query_status} for query id: {query_id} and command: {command}')
clickhouse_client = clickhouse_connect.get_client(host=CLICKHOUSE_HOST, port=8443, username='default',
password=CLICKHOUSE_PASSWORD, secure=True)
max_time = identify_max_time(clickhouse_client, CLICKHOUSE_TABLE, TIME_COLUMN)
logger.info(f'Max time: {max_time}')
command = f'{COMMAND} WHERE timestamp > \'{max_time}\''
c = 0
for column_names, rows in read_results(REDSHIFT_CLUSTER_IDENTIFIER, REDSHIFT_DATABASE, REDSHIFT_USER, command, max_cycles=100):
c += len(rows)
logger.info(f'Importing {len(rows)} rows...')
clickhouse_client.insert(CLICKHOUSE_TABLE, rows, column_names=column_names)
logger.info(f'Done - {c} rows imported')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment