Skip to content

Instantly share code, notes, and snippets.

@Ankcorn
Created June 1, 2025 16:53
Show Gist options
  • Select an option

  • Save Ankcorn/966aa24999d19773a9eece1e91a92847 to your computer and use it in GitHub Desktop.

Select an option

Save Ankcorn/966aa24999d19773a9eece1e91a92847 to your computer and use it in GitHub Desktop.
Loads json.gz files into clickouse as array pairs
import fs from 'fs'
import zlib from 'zlib'
import readline from 'readline'
import { flatten } from 'flat';
const CLICKHOUSE_CONFIG = {
url: 'http://localhost:8123',
database: 'default',
username: 'default',
password: '',
};
async function* readFileToClickhouseEntry(files: string[]) {
for (const file of files) {
const readStream = fs.createReadStream(file);
const gunzip = zlib.createGunzip();
const rl = readline.createInterface({
input: readStream.pipe(gunzip),
crlfDelay: Infinity
});
const itemInsertStatements: string[] = []
rl.on('line', (line) => {
try {
const jsonObject = JSON.parse(line);
const flatObject = flatten(jsonObject)
const stringNames: string[] = [];
const stringValues: string[] = [];
const numberNames: string[] = [];
const numberValues: number[] = [];
const booleanNames: string[] = [];
const booleanValues: boolean[] = [];
for (const [key, value] of Object.entries(flatObject || {})) {
switch (typeof value) {
case 'string':
stringNames.push(key);
stringValues.push(value)
break;
case 'number':
numberNames.push(key);
numberValues.push(value);
break;
case 'boolean':
booleanNames.push(key);
booleanValues.push(value);
}
}
const escapeString = (str) => {
return str.replace(/\\/g, '\\\\') // Escape backslashes
.replace(/'/g, "\\'") // Escape single quotes
.replace(/\n/g, '\\n') // Escape newlines
.replace(/\r/g, '\\r') // Escape carriage returns
.replace(/\t/g, '\\t'); // Escape tabs
};
const formatArray = (arr) => {
if (arr.length === 0) return '[]';
return `[${arr.map(item => {
if (typeof item === 'string') {
return `'${escapeString(item)}'`;
} else if (typeof item === 'number') {
return isNaN(item) || !isFinite(item) ? 'NULL' : item;
} else if (typeof item === 'boolean') {
return item ? 1 : 0;
} else {
return 'NULL';
}
}).join(',')}]`;
};
const rowData = `(${formatArray(stringNames)}, ${formatArray(stringValues)}, ${formatArray(numberNames)}, ${formatArray(numberValues)}, ${formatArray(booleanNames)}, ${formatArray(booleanValues)})`;
itemInsertStatements.push(rowData);
} catch (error) {
console.error('Invalid JSON line:', line);
}
});
const result = await new Promise((resolve, reject) => {
rl.on('close', async () => {
resolve(itemInsertStatements)
});
rl.on('error', (err) => {
console.error('Error:', err);
reject(err)
});
})
yield result as string[]
}
}
async function insertToClickhouse(statement: string) {
const url = `${CLICKHOUSE_CONFIG.url}/?database=${CLICKHOUSE_CONFIG.database}`;
const headers = {
'Content-Type': 'text/plain',
};
const response = await fetch(url, {
method: 'POST',
headers: headers,
body: statement,
});
if (response.ok) {
const result = await response.text();
console.log(`✅ Successfully inserted`);
if (result.trim()) {
console.log('Response:', result);
}
} else {
const errorText = await response.text();
console.error('❌ ClickHouse batch insert failed:');
console.error('Status:', response.status, response.statusText);
console.error('Error:', errorText);
}
}
async function main() {
let counter = 0
const files = fs.readdirSync('.')
const blueskyData = files.filter(el => el.endsWith('json.gz'))
for await(const insertData of readFileToClickhouseEntry(blueskyData)) {
/**
* insertData is 1 million rows to insert to clickhouse
* Insert into clickhouse 100_000 rows at a time to avoid string parsing issues
*/
while (insertData.length) {
const batch = insertData.splice(0, 100000)
const insertStatement = `INSERT INTO bluesky_kv (string_names, string_values, num_names, num_values, bool_names, bool_values) VALUES
${batch.join(',\n')};`;
await insertToClickhouse(insertStatement)
}
counter++
console.log(`Writen file ${counter} of 100`)
}
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment