Skip to content

Instantly share code, notes, and snippets.

@dbist
Last active April 11, 2023 14:33
Show Gist options
  • Save dbist/5dddb48a7ae305b7049d1675387513f3 to your computer and use it in GitHub Desktop.
Save dbist/5dddb48a7ae305b7049d1675387513f3 to your computer and use it in GitHub Desktop.

Using CockroachDB CDC with Apache Pulsar


As of this writing, CockroachDB does not have a direct integration with Apache Pulsar and this tutorial is an attempt to integrate CockroachDB CDC with Pulsar via our existing Kafka sink.


Previous articles on CockroachDB CDC


Motivation

Apache Pulsar is cloud-native distributed messaging and streaming platform. In my customer conversations, it most often comes up when compared to Apache Kafka. I have a customer needing a Pulsar sink support as they rely on the Pulsar's multiregion capabilities. CockroachDB does not have a native Pulsar sink, however, the Pulsar project supports Kafka on Pulsar protocol support, and that's the core of today's article.

This tutorial assumes you have an enterprise license, you can also leverage our managed offerings where enterprise changefeeds are enabled by default. I am going to demonstrate the steps using a docker environment instead.

High Level Steps

  • Deploy Apache Pulsar
  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy a Kafka Consumer
  • Verify
  • Conclusion

Step by step instructions

Deploy Apache Pulsar

Since I'm using Docker, I'm relying on the KoP Docker Compose environment provided by Stream Native platform, which spearheads development of Apache Pulsar.

I've used the service taken from the KoP example almost as is aside from a few differences:

pulsar:
    container_name: pulsar
    hostname: pulsar
    image: streamnative/sn-pulsar:2.11.0.5
    command: >
      bash -c "bin/apply-config-from-env.py conf/standalone.conf &&
      exec bin/pulsar standalone -nss -nfw" # disable stream storage and functions worker
    environment:
      allowAutoTopicCreationType: partitioned
      brokerDeleteInactiveTopicsEnabled: "false"
      PULSAR_PREFIX_messagingProtocols: kafka
      PULSAR_PREFIX_kafkaListeners: PLAINTEXT://pulsar:9092
      PULSAR_PREFIX_brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
      PULSAR_PREFIX_webServicePort: "8088"
    ports:
      - 6650:6650
      - 8088:8088
      - 9092:9092

I removed PULSAR_PREFIX_kafkaAdvertisedListeners: PLAINTEXT://127.0.0.1:19092 as I don't need it, I also changed the exposed port for - 19092:9092 to - 9092:9092. My PULSAR_PREFIX_kafkaListeners address points to a docker container with the hostname pulsar. I will need to access the address from other containers and I can't rely on the localhost. I also changed the webServicePort as it conflicts with the CockroachDB DB Console port PULSAR_PREFIX_webServicePort: "8088" and exposed it, i.e. 8088:8088. Finally, I've selected a more recent version of the image than the one in the KoP readme.

Deploy a CockroachDB cluster with enterprise changefeeds

I am using a 3 node cluster in Docker and if you've followed my previous articles, you should be familiar with it.

I am using Flyway to set up the schema and seed the tables. The actual schema and data are taken from the Changefeed examples we have in our docs. The only difference is I'm using a database called example.

To enable CDC we need to execute the following commands:

SET CLUSTER SETTING cluster.organization = '<organization name>';

SET CLUSTER SETTING enterprise.license = '<secret>';

SET CLUSTER SETTING kv.rangefeed.enabled = true;

Again, if you don't have an enterprise license, you won't be able to complete this tutorial. Feel free to use our Dedicated or Serverless instances if you want to follow along.

Finally, after the tables and the data are in place, we can create a changefeed on these tables.

CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://pulsar:9092';

Here I am using the Kafka port and the address of the Pulsar cluster, in my case pulsar.

        job_id
----------------------
  855538618543276035
(1 row)

NOTICE: changefeed will emit to topic office_dogs
NOTICE: changefeed will emit to topic employees

Time: 50ms total (execution 49ms / network 1ms)

Everything seems to work and changefeed does not error out.

Deploy a Kafka Consumer

To validate data is being written to Pulsar we need to stand up a kafka client. I've created an image which downloads and installs Kafka. Once the entire Docker Compose environment is running, we can access the client and run the console consumer to verify.

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic office_dogs --from-beginning
{"after": {"id": 1, "name": "Petee H"}}
{"after": {"id": 2, "name": "Carl"}}

If we want to validate new data is flowing, let's insert another record into CockroachDB

INSERT INTO office_dogs VALUES (3, 'Test');

The consumer will print a new row

{"after": {"id": 3, "name": "Test"}}

Since we've created two topics, let's now look at the employees topic.

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic employees --from-beginning
{"after": {"dog_id": 1, "employee_name": "Lauren", "rowid": 855539880336523267}}
{"after": {"dog_id": 2, "employee_name": "Spencer", "rowid": 855539880336654339}}

Similarly, let's update a record and see the changes propagate to Pulsar.

UPDATE employees SET employee_name = 'Spencer Kimball' WHERE dog_id = 2;
{"after": {"dog_id": 2, "employee_name": "Spencer Kimball", "rowid": 855539880336654339}}

Verify

We've confirmed we can produce messages to Pulsar topics using the Kafka protocol via KoP. We've also confirmed we can consume using the Kafka console consumer. We can also using the native Pulsar tooling to confirm the data is consumable from Pulsar. I installed the Pulsar python client pip install pulsar-client on the Kafka client machine and created a Python script with the following code:

import pulsar

client = pulsar.Client('pulsar://pulsar:6650')
consumer = client.subscribe('employees',
                            subscription_name='my-sub')

while True:
    msg = consumer.receive()
    print("Received message: '%s'" % msg.data())
    consumer.acknowledge(msg)

client.close()

Once I execute the script

root@kafka-client:/opt/kafka# python3 consume_messages.py 
2023-04-11 14:17:21.761 INFO  [281473255101472] Client:87 | Subscribing on Topic :employees
2023-04-11 14:17:21.762 INFO  [281473255101472] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000
2023-04-11 14:17:21.762 INFO  [281473255101472] ConnectionPool:97 | Created connection for pulsar://pulsar:6650
2023-04-11 14:17:21.763 INFO  [281473230237984] ClientConnection:388 | [172.28.0.3:33826 -> 172.28.0.6:6650] Connected to broker
2023-04-11 14:17:21.771 INFO  [281473230237984] HandlerBase:72 | [persistent://public/default/employees-partition-0, my-sub, 0] Getting connection from pool
2023-04-11 14:17:21.776 INFO  [281473230237984] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000
2023-04-11 14:17:21.776 INFO  [281473230237984] ConnectionPool:97 | Created connection for pulsar://localhost:6650
2023-04-11 14:17:21.776 INFO  [281473230237984] ClientConnection:390 | [172.28.0.3:33832 -> 172.28.0.6:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650
2023-04-11 14:17:21.786 INFO  [281473230237984] ConsumerImpl:238 | [persistent://public/default/employees-partition-0, my-sub, 0] Created consumer on broker [172.28.0.3:33832 -> 172.28.0.6:6650] 
2023-04-11 14:17:21.786 INFO  [281473230237984] MultiTopicsConsumerImpl:274 | Successfully Subscribed to a single partition of topic in TopicsConsumer. Partitions need to create : 0
2023-04-11 14:17:21.786 INFO  [281473230237984] MultiTopicsConsumerImpl:137 | Successfully Subscribed to Topics

Let's insert a record into the employees tables

INSERT INTO employees (dog_id, employee_name) VALUES (3, 'Test');
UPDATE employees SET employee_name = 'Artem' WHERE dog_id = 3;

The Pulsar client output

Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Test", "rowid": 855745376561364994}}''
Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Artem", "rowid": 855745376561364994}}''

Conclusion

And this is how you can leverage existing CockroachDB capability with non-standard services like Apache Pulsar. Hopefully you've found this article useful and can start leveraging the existing Kafka sink with non-standard message brokers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment