Skip to content

Instantly share code, notes, and snippets.

@dbist
Last active May 18, 2023 16:50
Show Gist options
  • Save dbist/4f923967cf08b47c218341d57e8aedac to your computer and use it in GitHub Desktop.
Save dbist/4f923967cf08b47c218341d57e8aedac to your computer and use it in GitHub Desktop.

Using DuckDB with CockroachDB


This is a fun experiment using DuckDB to parse CockroachDB Change Data Capture output and querying CockroachDB with DuckDB.



Motivation

CockroachDB has native support for change data capture. It supports object storage sinks across all major cloud providers. At the time of writing, there are a couple of supported formats available like Avro and newline-delimited json. Up until now I've been avoiding newline-delimited json because I don't find it easy to use. Today, I'd like to look at DuckDB as a viable tool to parse the CDC generated output in newline-delimited format.

High Level Steps

  • Start a CockroachDB cluster
  • Parse CockroachDB newly-delimited changefeed ouptut using DuckDB
  • Query CockroachDB tables using DuckDB
  • Conclusion

Step by step instructions

Start a CockroachDB cluster

I am using a serverless instance of CockroachDB. It has enterprise change feeds enabled by default. You can sign up for a free instance here.

Parse CockroachDB newly-delimited changefeed ouptut using DuckDB

We're going to follow the example to send sample data to an S3 bucket. DuckDB supports reading from S3 directly but today I'm going to download files to my machine and parse them locally.

I'm using the tpcc workload to generate changefeed data but you can use the example in the doc above.

Initialize

cockroach workload init tpcc \
 --warehouses 100 $DATABASE_URL

Execute the workload

cockroach workload run tpcc \
 --duration=120m \
 --concurrency=3 \
 --max-rate=1000 \
 --tolerate-errors \
 --warehouses=10 \
 --conns 60 \
 --ramp=1m \
 --workers=100 \
 $DATABASE_URL

Create a changefeed job

CREATE CHANGEFEED FOR TABLE history INTO 's3://artemawsbucket/tpcc/history?AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>&AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>' with updated;

Then, navigate to your S3 bucket and find the files there.

IMAGE_S3

Copy data from S3 to your filesystem

aws s3 cp s3://artemawsbucket/tpcc/history . --recursive

Install duckdb

brew install duckdb

Finally, navigate to the directory with the json files and start duckdb.

duckdb

Looking at the available json functions, the standard json function works

SELECT * FROM read_json_objects('202305161404194891609990000000000-fb5d1ff7b5a47331-2-15-00000000-history-a.ndjson');
│ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2404, "h_c_w_id": 1, "h_d_id": 8, "h_data": "9v3L5bOacQHehuVoJHJ2vp…  │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2431, "h_c_w_id": 1, "h_d_id": 8, "h_data": "ljve8BmeEvbQ5dJWLgvcp"…  │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2382, "h_c_w_id": 1, "h_d_id": 8, "h_data": "ve8BmeEvbQ5dJWLgvcp", …  │

Similarly, there's a newline-delimited function read_ndjson_objects, this time we're going to use globbing instead of individual file. We're also going to limit the output as my entire dataset is 3 million rows.

SELECT * FROM read_ndjson_objects('*.ndjson') LIMIT 5;
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                             json                                                             │
│                                                             json                                                             │
├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1166, "h_c_w_id": 25, "h_d_id": 10, "h_data": "Z5x9v3L5bOacQHehuVo…  │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1181, "h_c_w_id": 25, "h_d_id": 10, "h_data": "3L5bOacQHehuVoJHJ2v…  │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1171, "h_c_w_id": 25, "h_d_id": 10, "h_data": "L5bOacQHehuVoJHJ2vp…  │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1188, "h_c_w_id": 25, "h_d_id": 10, "h_data": "cQHehuVoJHJ2vp", "h…  │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1184, "h_c_w_id": 25, "h_d_id": 10, "h_data": "VzccrxcAzZ5x9v3L5b"…  │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────

We can create a DuckDB table out of the json files

CREATE TABLE history AS SELECT * FROM read_ndjson_objects('*.ndjson');
show tables;
┌─────────┐
│  name   │
│ varchar │
├─────────┤
│ history │
└─────────┘
select json as col from history limit 5;
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                             col                                                              │
│                                                             json                                                             │
├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1166, "h_c_w_id": 25, "h_d_id": 10, "h_data": "Z5x9v3L5bOacQHehuVo…  │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1181, "h_c_w_id": 25, "h_d_id": 10, "h_data": "3L5bOacQHehuVoJHJ2v…  │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1171, "h_c_w_id": 25, "h_d_id": 10, "h_data": "L5bOacQHehuVoJHJ2vp…  │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1188, "h_c_w_id": 25, "h_d_id": 10, "h_data": "cQHehuVoJHJ2vp", "h…  │
│ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1184, "h_c_w_id": 25, "h_d_id": 10, "h_data": "VzccrxcAzZ5x9v3L5b"…  │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

We can query the individual columns

select json->'after'->'h_amount' from history limit 1;
┌─────────────────────────────────┐
│ "json" -> 'after' -> 'h_amount' │
│              json               │
├─────────────────────────────────┤
│ 10.0                            │
└─────────────────────────────────┘

We can cast too

select json->'after'->'h_data', cast (json->'after'->'h_c_id' as integer) as c_id from history where c_id > 2000 limit 5;
┌───────────────────────────────┬───────┐
│ "json" -> 'after' -> 'h_data' │ c_id  │
│             json              │ int32 │
├───────────────────────────────┼───────┤
│ "7xrljve8BmeEvbQ5dJW"2002 │
│ "AzZ5x9v3L5bOac"2001 │
│ "x9v3L5bOacQHehuVoJ"2024 │
│ "2vp7xrljve8Bme"2006 │
│ "UtEdpJzCGyo91sT"2029 │
└───────────────────────────────┴───────┘

We can use ->> notation to output values as varchar instead of json

SELECT distinct(cast (json->>'after'->>'h_amount' as float)) FROM history LIMIT 5;
┌──────────────────────────────────────────────────────┐
│ CAST((("json" ->> 'after') ->> 'h_amount') AS FLOAT) │
│                        float                         │
├──────────────────────────────────────────────────────┤
│                                                 10.0 │
│                                              2612.12 │
│                                              3986.51 │
│                                              2836.18 │
│                                                359.5 │
└──────────────────────────────────────────────────────┘

Another useful json function is read_json_auto. It handles column types implicitly.

SELECT * FROM read_json_auto('*.ndjson');
┌──────────────────────────────────────────────┬──────────────────────────────────────────────┬────────────────────────────────┐
│                    after                     │                     key                      │            updated             │
│ struct(h_amount double, h_c_d_id ubigint, …  │                    json[]                    │            varchar             │
├──────────────────────────────────────────────┼──────────────────────────────────────────────┼────────────────────────────────┤
│ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id…  │ [25, "42674618-a16f-4000-8000-0000000bdfb5"] │ 1684245859489160999.0000000000 │
│ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id…  │ [25, "426799fb-7793-4c00-8000-0000000bdfc4"] │ 1684245859489160999.0000000000 │
│ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id…  │ [25, "4267620e-e8d1-4000-8000-0000000bdfba"] │ 1684245859489160999.0000000000 │
│ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id…  │ [25, "4267c121-0eb5-4800-8000-0000000bdfcb"] │ 1684245859489160999.0000000000 │
│ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id…  │ [25, "4267aac2-6f34-4400-8000-0000000bdfc7"] │ 1684245859489160999.0000000000 │

We can drill down to the individual array index level

SELECT CAST (key->0 AS INTEGER) AS hkey FROM read_json_auto('*.ndjson') WHERE hkey = 25 LIMIT 5;
┌───────┐
│ hkey  │
│ int32 │
├───────┤
│    25 │
│    25 │
│    25 │
│    25 │
│    25 │
└───────┘

This has a lot of promise and I will look closely as DuckDB grows in popularity. It will definitely help in analyzing the CDC output.

Query CockroachDB tables using DuckDB

DuckDB supports querying PostgreSQL directly using the PostgreSQL extension and today I'd like to see if we can do the same accessing CockroachDB.

duckdb
INSTALL postgres_scanner;
LOAD postgres_scanner;

CREATE SCHEMA abc;

CALL postgres_attach('dbname=defaultdb user=artem host=hostname port=26257 password=password' sslmode=verify-full sslrootcert=certlocation, source_schema='public' , sink_schema='abc');
┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ 0 rows  │
└─────────┘
SELECT table_schema,table_name,table_type  FROM information_schema.tables;
┌──────────────┬──────────────────┬────────────┐
│ table_schema │    table_name    │ table_type │
│   varcharvarcharvarchar   │
├──────────────┼──────────────────┼────────────┤
│ abc          │ pgbench_tellers  │ VIEW       │
│ abc          │ pgbench_history  │ VIEW       │
│ abc          │ pgbench_branches │ VIEW       │
│ abc          │ pgbench_accounts │ VIEW       │
│ abc          │ example          │ VIEW       │
└──────────────┴──────────────────┴────────────┘
PRAGMA show_tables;
┌──────────────────┐
│       name       │
│     varchar      │
├──────────────────┤
│ example          │
│ pgbench_accounts │
│ pgbench_branches │
│ pgbench_history  │
│ pgbench_tellers  │
└──────────────────┘

Query the tables directly, make sure to specify the abc schema

SELECT * FROM abc.pgbench_history LIMIT 5;
Error: Invalid Error: IO Error: Unable to query Postgres: ERROR:  at or near "(": syntax error
DETAIL:  source SQL:
COPY (SELECT "tid", "bid", "tbalance", "filler" FROM "public"."pgbench_tellers" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid ) TO STDOUT (FORMAT binary)
     ^
 ERROR:  at or near "(": syntax error
DETAIL:  source SQL:
COPY (SELECT "tid", "bid", "tbalance", "filler" FROM "public"."pgbench_tellers" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid ) TO STDOUT (FORMAT binary)

This is where it starts to break. The problem stems from DuckDB needing to return the result with FORMAT binary. In CockroachDB 23.1, COPY command works with text and csv format only. I've filed issues 1, 2, 3 to add support for binary, json and parquet.

demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT csv);                                     
1
2
3
4
5
demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT text);               
1
2
3
4
5
demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT binary);                                  
ERROR: unimplemented: binary format for COPY TO not implemented
SQLSTATE: 0A000
HINT: You have attempted to use a feature that is not yet implemented.
See: https://go.crdb.dev/issue-v/97180/v23.1

demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT json);                                    
invalid syntax: statement ignored: at or near "json": syntax error: unimplemented: this syntax
SQLSTATE: 0A000
DETAIL: source SQL:
COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT json)
                                                    ^
HINT: You have attempted to use a feature that is not yet implemented.
See: https://go.crdb.dev/issue-v/96590/v23.1

demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT parquet);                                 
invalid syntax: statement ignored: at or near "parquet": syntax error: unimplemented: this syntax
SQLSTATE: 0A000
DETAIL: source SQL:
COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT parquet)
                                                    ^
HINT: You have attempted to use a feature that is not yet implemented.
See: https://go.crdb.dev/issue-v/96590/v23.1

Unfortunately, the postgres_scanner does not work with text or csv or at least I haven't found a way.

D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT csv);
Error: Invalid Error: IO Error: Unable to query Postgres: SSL SYSCALL error: EOF detected
 SSL SYSCALL error: EOF detected
D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT text);
Error: Catalog Error: Copy Function with name text does not exist!
Did you mean "parquet"?
D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT parquet);
Error: Invalid Error: IO Error: Unable to query Postgres: SSL SYSCALL error: EOF detected
 SSL SYSCALL error: EOF detected

Conclusion

Your mileage will vary, this was a fun experiment and I will be paying close attention as this project matures.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment