Skip to content

Instantly share code, notes, and snippets.

@Ugbot
Last active November 6, 2023 21:39
Show Gist options
  • Save Ugbot/5d06844941a1485107f521cc5dd46877 to your computer and use it in GitHub Desktop.
Save Ugbot/5d06844941a1485107f521cc5dd46877 to your computer and use it in GitHub Desktop.
s3 to clickhouse
import os
from clickhouse_driver import Client
# Initialize ClickHouse client
clickhouse_host = os.environ['CLICKHOUSE_HOST']
clickhouse_port = os.environ['CLICKHOUSE_PORT']
clickhouse_user = os.environ['CLICKHOUSE_USER']
clickhouse_password = os.environ['CLICKHOUSE_PASSWORD']
clickhouse_database = os.environ['CLICKHOUSE_DATABASE']
clickhouse_table = os.environ['CLICKHOUSE_TABLE']
client = Client(host=clickhouse_host,
port=clickhouse_port,
user=clickhouse_user,
password=clickhouse_password,
database=clickhouse_database)
def lambda_handler(event, context):
# Get the S3 bucket name and object key from the event
bucket_name = event['Records'][0]['s3']['bucket']['name']
object_key = event['Records'][0]['s3']['object']['key']
# Construct the full S3 URI for the object
s3_uri = f's3://{bucket_name}/{object_key}'
# Construct ClickHouse query to load CSV data directly from S3
# NOTE: The CSV file must be publicly accessible or the ClickHouse server
# must have permissions to read from the S3 bucket.
query = f"""
INSERT INTO {clickhouse_table}
FORMAT CSVWithNames
SELECT * FROM s3('{s3_uri}', 'CSV', 'column1 UInt32, column2 String, ...')
"""
# Execute the query
result = client.execute(query)
# Return the result of the query execution
return {
'statusCode': 200,
'body': f"Successfully loaded data into ClickHouse table {clickhouse_table} from {s3_uri}"
}
import os
import boto3
import csv
from clickhouse_driver import Client
# Initialize ClickHouse client
clickhouse_host = os.environ['CLICKHOUSE_HOST']
clickhouse_port = int(os.environ['CLICKHOUSE_PORT'])
clickhouse_user = os.environ['CLICKHOUSE_USER']
clickhouse_password = os.environ['CLICKHOUSE_PASSWORD']
clickhouse_database = os.environ['CLICKHOUSE_DATABASE']
clickhouse_table = os.environ['CLICKHOUSE_TABLE']
clickhouse_client = Client(host=clickhouse_host,
port=clickhouse_port,
user=clickhouse_user,
password=clickhouse_password,
database=clickhouse_database)
def lambda_handler(event, context):
# Get bucket name and file key from the S3 event
bucket_name = event['Records'][0]['s3']['bucket']['name']
file_key = event['Records'][0]['s3']['object']['key']
# Initialize S3 client
s3_client = boto3.client('s3')
# Get the file object from S3
file_obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
# Read the file content
file_content = file_obj['Body'].read().decode('utf-8')
# Assuming the file is a CSV, parse it
lines = file_content.split('\n')
csv_reader = csv.reader(lines)
# Prepare data to insert into ClickHouse
data_to_insert = [row for row in csv_reader]
# Insert data into ClickHouse
clickhouse_client.execute(f'INSERT INTO {clickhouse_table} VALUES', data_to_insert)
return {
'statusCode': 200,
'body': f'Successfully inserted data into ClickHouse table {clickhouse_table}'
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment