Skip to content

Instantly share code, notes, and snippets.

@statico
Created May 28, 2023 03:11
Show Gist options
  • Save statico/769d05d345818b388777eae5171907cb to your computer and use it in GitHub Desktop.
Save statico/769d05d345818b388777eae5171907cb to your computer and use it in GitHub Desktop.
AWS Kinesis Firehose + S3 bucket to ClickHouse import Lambda
/*global fetch*/
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3'
const s3 = new S3Client({ region: 'us-east-2' })
export const handler = async (event, context) => {
// console.log('Received event:', JSON.stringify(event, null, 2))
const bucket = event.Records[0].s3.bucket.name
const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '))
const region = event.Records[0].awsRegion
const res = await fetch(
`https://${process.env.CLICKHOUSE_HOST}:8443?database=${process.env.CLICKHOUSE_DATABASE}`,
{
method: 'POST',
headers: {
'X-ClickHouse-User': process.env.CLICKHOUSE_USERNAME,
'X-ClickHouse-Key': process.env.CLICKHOUSE_PASSWORD,
},
body: `
insert into ${process.env.CLICKHOUSE_DATABASE}.ads_events
select *
from s3(
'https://${bucket}.s3.${region}.amazonaws.com/${key}',
'${process.env.CLICKHOUSE_AWS_ACCESS_KEY_ID}',
'${process.env.CLICKHOUSE_AWS_SECRET_ACCESS_KEY}',
'CSV'
);
`
}
)
if (res.status !== 200)
throw new Error(`Clickhouse import failed: ${res.status} ${res.statusText} - ${await res.text()}`)
};
@statico
Copy link
Author

statico commented May 28, 2023

Public domain

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment