Skip to content

Instantly share code, notes, and snippets.

@slvrtrn

slvrtrn/index.ts Secret

Created November 2, 2023 01:16
Show Gist options
  • Save slvrtrn/7fb24918661e9b5066a131f32b194ca1 to your computer and use it in GitHub Desktop.
Save slvrtrn/7fb24918661e9b5066a131f32b194ca1 to your computer and use it in GitHub Desktop.
import {
ClickHouseClient,
createClient,
InsertResult,
} from '@clickhouse/client'
import { ClickHouseLogLevel } from '@clickhouse/client-common'
import process from 'node:process'
import Stream from 'node:stream'
export class ClickHouse {
#insertPromise: Promise<InsertResult> | undefined
#client: ClickHouseClient<Stream.Readable>
#stream: Stream.Readable
constructor() {
this.#stream = this.createStream()
this.#client = createClient({
host: process.env.CLICKHOUSE_HOST,
username: 'default',
password: process.env.CLICKHOUSE_SECRET,
keep_alive: {
enabled: true,
idle_socket_ttl: 2500,
},
log: {
level: ClickHouseLogLevel.TRACE,
},
})
}
async init() {
await this.#client
.command({
query: `
CREATE OR REPLACE TABLE test_table
(
id UInt32,
name String,
time DateTime64
) ENGINE = Memory
`,
})
.catch((error) => console.error('⚠️ clickhouse init error:', error))
}
async createPromise(table: string) {
if (this.#insertPromise !== undefined) {
console.log('Closing the existing stream')
this.#stream.push(null)
console.log('Awaiting the insert promise')
await this.#insertPromise
console.log('Creating a new stream')
this.#stream = this.createStream()
}
console.log('Creating a new insert promise')
this.#insertPromise = this.#client
.insert({
table,
values: this.#stream,
format: 'JSONEachRow',
})
.catch(async (error) => {
console.error(error)
process.exit(255)
})
}
createStream() {
return new Stream.Readable({
objectMode: true,
read() {},
})
}
async streamTestData(n? :number) {
console.log('streaming in test data')
for (let index = 0; index < (n ?? 1000); index++) {
this.#stream.push({
id: index,
name: 'test',
time: Date.now(),
})
}
}
// insert data using INSERT (not stream) with 5 sec sleep in between
async insertTestData() {
for (let index = 0; index < 2; index++) {
console.log('inserting test data', index)
await this.#client
.insert({
table: 'test_table',
values: [
{
id: index,
time: Date.now(),
name: 'test',
},
],
format: 'JSONEachRow',
})
.catch(console.error)
await new Promise((resolve) => setTimeout(resolve, 5_000))
}
}
async closeStreams() {
// count rows in test_table
const { data } = await (
await this.#client.query({ query: 'SELECT count(*) FROM test_table' })
).json<{ data: unknown }>()
console.log('count rows in test_table', data)
// close stream
console.log('clickhouse cleanup')
this.#stream.push(null)
// stream.destroy()
// when the stream is closed, the insert stream can be awaited
if (this.#insertPromise !== undefined) {
await this.#insertPromise
}
await this.#client.close()
console.log('clickhouse cleanup done')
}
}
void (async () => {
// this passes
const test1 = async () => {
console.log('TEST #1')
// await clickhouse.createPromise('test_table')
// await clickhouse.streamTestData()
await clickhouse.insertTestData()
}
// this passes
const test2 = async () => {
console.log('TEST #2')
await clickhouse.createPromise('test_table')
await clickhouse.streamTestData()
// await clickhouse.insertTestData()
}
// this fails with SOCKET_TIMEOUT
const test3 = async () => {
console.log('TEST #3')
await clickhouse.createPromise('test_table')
await clickhouse.streamTestData()
await clickhouse.insertTestData()
}
// this fails with Error: socket hang up ECONNRESET
const test4 = async () => {
console.log('TEST #4')
await clickhouse.createPromise('test_table')
await clickhouse.streamTestData(1)
await clickhouse.insertTestData()
}
const clickhouse = new ClickHouse()
await clickhouse.init()
// await test1()
// await test2()
await test3()
await test4()
await clickhouse.closeStreams()
})()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment