Skip to content

Instantly share code, notes, and snippets.


slvrtrn/out Secret

Created November 1, 2023 22:20
Show Gist options
  • Save slvrtrn/f920f042dea8e6ebfdb9728b124a5b71 to your computer and use it in GitHub Desktop.
Save slvrtrn/f920f042dea8e6ebfdb9728b124a5b71 to your computer and use it in GitHub Desktop.
inserting test data 0
inserting test data 1
inserting test data 2
inserting test data 3
inserting test data 4
inserting test data 5
inserting test data 6
inserting test data 7
inserting test data 8
inserting test data 9
streaming in test data
streaming in test data
inserting test data 0
inserting test data 1
inserting test data 2
inserting test data 3
inserting test data 4
inserting test data 5
inserting test data 6
inserting test data 7
inserting test data 8
inserting test data 9
inserting test data 0
inserting test data 1
inserting test data 2
inserting test data 3
inserting test data 4
inserting test data 5
inserting test data 6
inserting test data 7
inserting test data 8
inserting test data 9
count rows in test_table [ { 'count()': '2030' } ]
clickhouse cleanup
clickhouse cleanup done
import {
} from '@clickhouse/client'
import process from 'node:process'
import Stream from 'node:stream'
export class ClickHouse {
#insertPromise: Promise<InsertResult> | undefined
#client: ClickHouseClient<Stream.Readable>
#stream: Stream.Readable
constructor() {
this.#stream = this.createStream()
this.#client = createClient({
host: process.env.CLICKHOUSE_HOST,
username: 'default',
password: process.env.CLICKHOUSE_SECRET,
keep_alive: {
enabled: false,
// socket_ttl: 2500,
// retry_on_expired_socket: true,
async init() {
await this.#client
query: `
id UInt32,
name String,
time DateTime64
) ENGINE = Memory
.catch((error) => console.error('⚠️ clickhouse init error:', error))
async createPromise(table: string) {
if (this.#insertPromise !== undefined) {
await this.#insertPromise
this.#stream = this.createStream()
this.#insertPromise = this.#client
values: this.#stream,
format: 'JSONEachRow',
.catch(async (error) => {
createStream() {
return new Stream.Readable({
objectMode: true,
read() {},
async streamTestData() {
console.log('streaming in test data')
for (let index = 0; index < 1000; index++) {
id: index,
name: 'test',
// insert data using INSERT (not stream) with 5 sec sleep in between
async insertTestData() {
for (let index = 0; index < 10; index++) {
console.log('inserting test data', index)
await this.#client
table: 'test_table',
values: [
id: index,
name: 'test',
format: 'JSONEachRow',
await new Promise((resolve) => setTimeout(resolve, 5_000))
async closeStreams() {
// count rows in test_table
const { data } = await (
await this.#client.query({ query: 'SELECT count(*) FROM test_table' })
).json<{ data: unknown }>()
console.log('count rows in test_table', data)
// close stream
console.log('clickhouse cleanup')
// stream.destroy()
// when the stream is closed, the insert stream can be awaited
if (this.#insertPromise !== undefined) {
await this.#insertPromise
await this.#client.close()
console.log('clickhouse cleanup done')
void (async () => {
// this passes
const test1 = async () => {
// await clickhouse.createPromise('test_table')
// await clickhouse.streamTestData()
await clickhouse.insertTestData()
// this passes
const test2 = async () => {
await clickhouse.createPromise('test_table')
await clickhouse.streamTestData()
// await clickhouse.insertTestData()
// this fails with SOCKET_TIMEOUT
const test3 = async () => {
await clickhouse.createPromise('test_table')
await clickhouse.streamTestData()
await clickhouse.insertTestData()
// this fails with Error: socket hang up ECONNRESET
const test4 = async () => {
await clickhouse.createPromise('test_table')
// await clickhouse.streamTestData()
await clickhouse.insertTestData()
const clickhouse = new ClickHouse()
await clickhouse.init()
await test1()
await test2()
await test3()
await test4()
await clickhouse.closeStreams()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment