Created
March 15, 2023 18:03
-
-
Save gingerwizard/319607a98e5d979d8ca43a6a125b54d1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import sys | |
import time | |
import clickhouse_connect | |
from boto3 import Session | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
handler = logging.StreamHandler(sys.stdout) | |
handler.setLevel(logging.INFO) | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
CLICKHOUSE_HOST = '<host>' | |
CLICKHOUSE_PASSWORD = '<password>' | |
CLICKHOUSE_TABLE = 'blocks' | |
TIME_COLUMN = 'timestamp' | |
MAX_WAIT_CYCLES = 100 | |
COMMAND = f'SELECT number,hash,parent_hash,nonce,sha3_uncles,logs_bloom,transactions_root,state_root,receipts_root,' \ | |
f'miner, difficulty,total_difficulty,size,extra_data,gas_limit,' \ | |
f'gas_used,date_part(epoch, "timestamp")::integer as "timestamp", transaction_count,base_fee_per_gas ' \ | |
f'FROM blocks' | |
REDSHIFT_DATABASE = 'dev' | |
REDSHIFT_CLUSTER_IDENTIFIER = 'redshift-cluster-1' | |
REDSHIFT_USER = 'awsuser' | |
def paginate_results(client, query_id, max_items=1000): | |
paginator = client.get_paginator('get_statement_result') | |
response = paginator.paginate( | |
Id=query_id, | |
PaginationConfig={ | |
'MaxItems': max_items | |
} | |
).build_full_result() | |
yield [column['name'] for column in response['ColumnMetadata']], [[list(value.values())[0] for value in row] | |
for row in response['Records']] | |
while ('NextToken' in response): | |
response = paginator.paginate( | |
Id=query_id, | |
PaginationConfig={ | |
'MaxItems': max_items, | |
'StartingToken': response['NextToken'] | |
}).build_full_result() | |
yield [column['name'] for column in response['ColumnMetadata']], [[list(value.values())[0] for value in row] for | |
row in response['Records']] | |
def identify_max_time(client, table, time_column): | |
response = client.query(f'SELECT max({time_column}) FROM {table}') | |
return response.first_row[0] | |
def read_results(cluster_identifier, database, user, command, max_cycles=100): | |
new_session = Session() | |
redshift_client = new_session.client('redshift-data') | |
response = redshift_client.execute_statement( | |
Database=database, | |
Sql=command, | |
StatementName='row_export', | |
WithEvent=False, | |
ClusterIdentifier=cluster_identifier, | |
DbUser=user, | |
) | |
query_id = response['Id'] | |
attempts = 0 | |
done = False | |
while attempts < max_cycles and not done: | |
attempts += 1 | |
time.sleep(1) | |
desc = redshift_client.describe_statement(Id=query_id) | |
query_status = desc['Status'] | |
if query_status == 'FAILED': | |
logger.error(f'SQL query failed: {query_id} : {desc["Error"]}') | |
raise Exception(f'SQL query failed: {query_id} : {desc["Error"]}') | |
elif query_status == 'FINISHED': | |
done = True | |
for column_names, rows in paginate_results(redshift_client, query_id, max_items=1000): | |
yield column_names, rows | |
else: | |
logger.info('waiting for result') | |
if done == False and attempts >= MAX_WAIT_CYCLES: | |
logger.error( | |
'Limit for MAX_WAIT_CYCLES has been reached before the query was able to finish. We have exited out of the ' | |
'while-loop. You may increase the limit accordingly. \n') | |
raise Exception(f'query status is: {query_status} for query id: {query_id} and command: {command}') | |
clickhouse_client = clickhouse_connect.get_client(host=CLICKHOUSE_HOST, port=8443, username='default', | |
password=CLICKHOUSE_PASSWORD, secure=True) | |
max_time = identify_max_time(clickhouse_client, CLICKHOUSE_TABLE, TIME_COLUMN) | |
logger.info(f'Max time: {max_time}') | |
command = f'{COMMAND} WHERE timestamp > \'{max_time}\'' | |
c = 0 | |
for column_names, rows in read_results(REDSHIFT_CLUSTER_IDENTIFIER, REDSHIFT_DATABASE, REDSHIFT_USER, command, max_cycles=100): | |
c += len(rows) | |
logger.info(f'Importing {len(rows)} rows...') | |
clickhouse_client.insert(CLICKHOUSE_TABLE, rows, column_names=column_names) | |
logger.info(f'Done - {c} rows imported') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment