-
-
Save slvrtrn/7fb24918661e9b5066a131f32b194ca1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
ClickHouseClient, | |
createClient, | |
InsertResult, | |
} from '@clickhouse/client' | |
import { ClickHouseLogLevel } from '@clickhouse/client-common' | |
import process from 'node:process' | |
import Stream from 'node:stream' | |
export class ClickHouse { | |
#insertPromise: Promise<InsertResult> | undefined | |
#client: ClickHouseClient<Stream.Readable> | |
#stream: Stream.Readable | |
constructor() { | |
this.#stream = this.createStream() | |
this.#client = createClient({ | |
host: process.env.CLICKHOUSE_HOST, | |
username: 'default', | |
password: process.env.CLICKHOUSE_SECRET, | |
keep_alive: { | |
enabled: true, | |
idle_socket_ttl: 2500, | |
}, | |
log: { | |
level: ClickHouseLogLevel.TRACE, | |
}, | |
}) | |
} | |
async init() { | |
await this.#client | |
.command({ | |
query: ` | |
CREATE OR REPLACE TABLE test_table | |
( | |
id UInt32, | |
name String, | |
time DateTime64 | |
) ENGINE = Memory | |
`, | |
}) | |
.catch((error) => console.error('⚠️ clickhouse init error:', error)) | |
} | |
async createPromise(table: string) { | |
if (this.#insertPromise !== undefined) { | |
console.log('Closing the existing stream') | |
this.#stream.push(null) | |
console.log('Awaiting the insert promise') | |
await this.#insertPromise | |
console.log('Creating a new stream') | |
this.#stream = this.createStream() | |
} | |
console.log('Creating a new insert promise') | |
this.#insertPromise = this.#client | |
.insert({ | |
table, | |
values: this.#stream, | |
format: 'JSONEachRow', | |
}) | |
.catch(async (error) => { | |
console.error(error) | |
process.exit(255) | |
}) | |
} | |
createStream() { | |
return new Stream.Readable({ | |
objectMode: true, | |
read() {}, | |
}) | |
} | |
async streamTestData(n? :number) { | |
console.log('streaming in test data') | |
for (let index = 0; index < (n ?? 1000); index++) { | |
this.#stream.push({ | |
id: index, | |
name: 'test', | |
time: Date.now(), | |
}) | |
} | |
} | |
// insert data using INSERT (not stream) with 5 sec sleep in between | |
async insertTestData() { | |
for (let index = 0; index < 2; index++) { | |
console.log('inserting test data', index) | |
await this.#client | |
.insert({ | |
table: 'test_table', | |
values: [ | |
{ | |
id: index, | |
time: Date.now(), | |
name: 'test', | |
}, | |
], | |
format: 'JSONEachRow', | |
}) | |
.catch(console.error) | |
await new Promise((resolve) => setTimeout(resolve, 5_000)) | |
} | |
} | |
async closeStreams() { | |
// count rows in test_table | |
const { data } = await ( | |
await this.#client.query({ query: 'SELECT count(*) FROM test_table' }) | |
).json<{ data: unknown }>() | |
console.log('count rows in test_table', data) | |
// close stream | |
console.log('clickhouse cleanup') | |
this.#stream.push(null) | |
// stream.destroy() | |
// when the stream is closed, the insert stream can be awaited | |
if (this.#insertPromise !== undefined) { | |
await this.#insertPromise | |
} | |
await this.#client.close() | |
console.log('clickhouse cleanup done') | |
} | |
} | |
void (async () => { | |
// this passes | |
const test1 = async () => { | |
console.log('TEST #1') | |
// await clickhouse.createPromise('test_table') | |
// await clickhouse.streamTestData() | |
await clickhouse.insertTestData() | |
} | |
// this passes | |
const test2 = async () => { | |
console.log('TEST #2') | |
await clickhouse.createPromise('test_table') | |
await clickhouse.streamTestData() | |
// await clickhouse.insertTestData() | |
} | |
// this fails with SOCKET_TIMEOUT | |
const test3 = async () => { | |
console.log('TEST #3') | |
await clickhouse.createPromise('test_table') | |
await clickhouse.streamTestData() | |
await clickhouse.insertTestData() | |
} | |
// this fails with Error: socket hang up ECONNRESET | |
const test4 = async () => { | |
console.log('TEST #4') | |
await clickhouse.createPromise('test_table') | |
await clickhouse.streamTestData(1) | |
await clickhouse.insertTestData() | |
} | |
const clickhouse = new ClickHouse() | |
await clickhouse.init() | |
// await test1() | |
// await test2() | |
await test3() | |
await test4() | |
await clickhouse.closeStreams() | |
})() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment