Skip to content

Instantly share code, notes, and snippets.

@dbist
Last active August 30, 2023 15:23
Show Gist options
  • Save dbist/ad7a8fd11835da8b0596ccbb9a2c0eaf to your computer and use it in GitHub Desktop.
Save dbist/ad7a8fd11835da8b0596ccbb9a2c0eaf to your computer and use it in GitHub Desktop.

Emitting Protocol Buffers with CockroachDB CDC Queries


As of this writing, CockroachDB does not have official support for protobuf payload as part of change data capture. Today, I am going to demonstrate a feature that may or may not land in CockroachDB proper. The product team is looking into viability of the feature before we can officially support it.


Previous articles on CockroachDB CDC


Motivation

Protocol Buffers are language-neutral, platform-neutral extensible mechanisms for serializing structured data. It's a common choice for platforms needing to pass messages between systems. CockroachDB is a distributed SQL database built on a transactional and strongly-consistent key-value store. It scales horizontally; survives disk, machine, rack, and even datacenter failures with minimal latency disruption and no manual intervention; supports strongly-consistent ACID transactions; and provides a familiar SQL API for structuring, manipulating, and querying data. There is no official support for Protocol Buffers in CockroachDB Changefeeds, even though we use Protocol Buffers extensively in code. A recent customer conversation led to this experiment where I'm going to use several recent features to demonstrate ability to serialize CockroachDB rows to proto and emit via CDC Queries. This is the first time we're looking at CDC Queries. This is a new flexible way to express CockroachDB streams.

This tutorial assumes you have an enterprise license. Given the features in this tutorial are unavailable as a product, you have to follow the steps exactly as described to pull the right source code to make it work. These features are not available in any of the available offerings from Cockroach Labs.

High Level Steps

  • Build CockroachDB with the Protocol Buffers function
  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy a Kafka Consumer
  • Verify
  • Conclusion

Step by step instructions

Build CockroachDB with the Protocol Buffers function

Before I show you how to get this working, I'd like to express my gratitude to Yevgeniy Miretskiy, who works on the CDC team for the capability and his mentorship to get this working. The source code for the feature is available in the following commit. For brevity, I will skip the steps to setup a build environment.

Check out the pull request

gh pr checkout 89955

Run the preliminary steps to build Cockroach from source.

./dev doctor

Finally build the code

bazel build pkg/cmd/cockroach-short

Navigate to the directory with the built package

cd _bazel/bin/pkg/cmd/cockroach

Deploy a CockroachDB cluster with enterprise changefeeds

Start an instance of CockroachDB using the built package

./cockroach start-single-node --insecure --background

To enable CDC we need to execute the following commands:

SET CLUSTER SETTING cluster.organization = '<organization name>';

SET CLUSTER SETTING enterprise.license = '<secret>';

SET CLUSTER SETTING kv.rangefeed.enabled = true;

Generate sample data

CREATE TABLE office_dogs (
     id INT PRIMARY KEY,
     name STRING);

INSERT INTO office_dogs VALUES
   (1, 'Petee'),
   (2, 'Carl');

UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
SELECT * FROM office_dogs;
  id |  name
-----+----------
   1 | Petee H
   2 | Carl

The function we are going to use to convert rows to Protocol Buffers is crdb_internal.row_to_proto(). With the given pull request, this function is readily available for querying.

SELECT crdb_internal.row_to_proto(office_dogs) FROM office_dogs;
                          crdb_internal.row_to_proto
------------------------------------------------------------------------------
  \x0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f
  \x0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040

It takes the row and serializes as proto. We can decode the row back to human readable form using the following query, skip the \x and copy the rest of the output into the following function:

SELECT crdb_internal.pb_to_json('google.protobuf.Struct',
decode('0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f', 'hex')) AS proto;
             proto
--------------------------------
  {"id": 1, "name": "Petee H"}
SELECT crdb_internal.pb_to_json('google.protobuf.Struct',
decode('0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040', 'hex')) AS proto;
            proto
-----------------------------
  {"id": 2, "name": "Carl"}

We can use this function in the CDC query, but first, let's set up a webhook sink for a quick demonstration of changefeed queries.

git clone https://github.com/cockroachlabs/cdc-webhook-sink-test-server.git
cd cdc-webhook-sink-test-server
cd go-https-server
chmod +x server.sh
./server.sh
./server.sh
......+.........+.....+....+...+.....+...+.+.....+++++++++++++++++++++++++++++++++++++++++++++*.+...........+.+......+.....+...+....+...+......+...........+...+......+++++++++++++++++++++++++++++++++++++++++++++*......+.....+.............+...+..+.+.....................+......+..+.+++++
-----
2023/08/30 09:39:05 starting server on port 3000

With all of the basics in place, we can create a changefeed

CREATE CHANGEFEED INTO 'webhook-https://localhost:3000?insecure_tls_skip_verify=true' WITH updated AS SELECT crdb_internal.row_to_proto(office_dogs) AS proto FROM office_dogs;
        job_id
----------------------
  895654238351589377

Verify

Looking at the terminal where the webhook sink is running

2023/08/30 11:11:25 {"payload":[{"__crdb__": {"updated": "1693408285757033000.0000000000"}, "proto": "\\x0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040"}],"length":1}
2023/08/30 11:11:25 {"payload":[{"__crdb__": {"updated": "1693408285757033000.0000000000"}, "proto": "\\x0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f"}],"length":1}

Let's update a record in the office_dogs TABLE

UPDATE office_dogs SET name = 'Tarzan' WHERE id = 1; 
2023/08/30 11:12:58 {"payload":[{"__crdb__": {"updated": "1693408377084928000.0000000000"}, "proto": "\\x0a100a046e616d6512081a065461727a616e0a0f0a026964120911000000000000f03f"}],"length":1}

If we use the decode function to inspect the payload

SELECT crdb_internal.pb_to_json('google.protobuf.Struct', decode('0a100a046e616d6512081a065461727a616e0a0f0a026964120911000000000000f03f', 'hex')) AS proto;
             proto
-------------------------------
  {"id": 1, "name": "Tarzan"}

I have to mention that the emitted messages are of dynamically typed format and not strongly typed. If your use case requires strongly typed, it's a conversation we have to have another time.

Conclusion

And this is how you can leverage CockroachDB CDC Queries with built-in functions. This function is not available but it can be, given higher demand. Hopefully you've found this article useful. Please reach out to our teach if you need this capability and we will consider it in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment