Skip to content

Instantly share code, notes, and snippets.

@mrflip
Last active March 28, 2024 12:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrflip/d0ee086bcc6a96a920342fa029310d90 to your computer and use it in GitHub Desktop.
Save mrflip/d0ee086bcc6a96a920342fa029310d90 to your computer and use it in GitHub Desktop.
Running debezium and kafka connect locally with homebrew

Running Debezium / Kafka Connect locally with homebrew

Initial setup

  • Install all the things: brew install kafka zookeeper maven openjdk
  • Make a directory for plugins: mkdir -p /usr/local/share/kafka/plugins
  • Edit the file /usr/local/etc/kafka/connect-standalone.properties and uncomment the line setting up a plugin path: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
  • Start all the things: brew services start zookeeper and brew services start kafka

Testing a simple connector

  • Make a file named file_source_brew.properties as shown below
  • Run echo "Hello world" > /tmp/file_source_testfile.txt to create the file we'll use as a source
  • Run connect-standalone /usr/local/etc/kafka/connect-standalone.properties ./file_source_brew.properties. See below for the key milestones that should appear in the logs.
  • Running kafka-topics --list --bootstrap-server localhost:9092 should show that the file_source_brew topic has been created.
  • To test,
    • Run kafka-console-consumer --bootstrap-server localhost:9092 --topic file_source_brew in one terminal tab
    • In another, run cat >> /tmp/file_source_testfile.txt.
    • Anything you type into the cat should be echoed by the consumer you just started.

Key milestones in the connector's logs

  • Added plugin '... -- make sure the plugins you are trying to run show up here; in this case, org.apache.kafka.connect.file.FileStreamSourceConnector.
  • The config file shows the kafka server it's connecting to: bootstrap.servers = [localhost:9092]
  • The lines Kafka cluster ID: Gobbledyjumble up through INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57) show that you have connected and that the basic connect stuff is working
  • Instantiated connector file_source_brew with version -- the plugin has been found
  • Creating task file_source_brew- -- and started
  • Created connector file_source_brew (org.apache.kafka.connect.cli.ConnectStandalone:109) -- the topic has been created
  • Error while fetching metadata with correlation id 3 : {connect-test=LEADER_NOT_AVAILABLE -- ignore this, it's teething pains as the worker figures itself out
  • INFO [file_source_brew|task-0] [Producer clientId=connector-producer-file_source_brew-0] Resetting the last seen epoch is good news.
  • [file_source_brew|task-0|offsets] WorkerSourceTask{id=file_source_brew-0} Either no records were produced by the task since the last offset commit... is good news, it reached the end of the file.

Setting up a Postgres connector

  • Download the postgres connector; these instructions are tested against Debezium v1.8. Get the Postgres connector and copy it to a convenient directory -- for this example, we'll work in $HOME/code/connectors
mkdir -p $HOME/code/connectors
cd $HOME/code/connectors
open .
# (copy the file from your downloads folder to the one we just made)
tar xvzf debezium-connector-postgres-1.8.1.Final-plugin.tar.gz

You should now see a folder named debezium-connector-postgres containing several fies, one of them named something like debezium-connector-postgres-1.8.1.Final.jar. To enable the plugin, make a symlink from your connector directories (the immediate parent folder holding the jars) to a directory in /usr/local/share/kafka/plugins:

ln -snf  $HOME/code/connectors/debezium-connector-postgres /usr/local/share/kafka/plugins/debezium-connector-postgres`

If you re-run the file connector example, you should see it in the mix:

[2022-03-16 14:41:02,870] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:77)
[2022-03-16 14:41:02,889] INFO Loading plugin from: /usr/local/share/kafka/plugins/debezium-connector-postgres (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:265)
[2022-03-16 14:41:03,437] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/local/share/kafka/plugins/debezium-connector-postgres/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:288)
[2022-03-16 14:41:03,438] INFO Added plugin 'io.debezium.connector.postgresql.PostgresConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
[2022-03-16 14:41:03,438] INFO Added plugin 'io.debezium.converters.ByteBufferConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
[2022-03-16 14:41:03,438] INFO Added plugin 'io.debezium.converters.CloudEventsConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
[2022-03-16 14:41:03,438] INFO Added plugin 'io.debezium.transforms.ExtractNewRecordState' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)

Install and configure Postgres

brew install postgres

Edit /usr/local/var/postgres/postgresql.conf and uncomment the following line:

wal_level = logical			# minimal, replica, or logical

Start/restart the server:

brew services restart postgresql

Set up a test database

You can skip all this if you already have a running loaded database. For a graphical interface to postgres, download and install Beekeeper Studio (https://www.beekeeperstudio.io/). However, first we must set up a user and database.

Start the postgres shell:

psql -U $USER -h localhost -p 5432

(This assumes your postgres root user is named for your system username, which is the homebrew default. If that doesn't work, try psql -U postgres -h localhost -p 5432 ).

Run the following commands:

create database inventory;
create user happy with encrypted password 'HAPPY_PASSWORD';
alter  user happy with encrypted password 'HAPPY_PASSWORD';
grant all privileges on database inventory   to happy;
\q

(Typing backslash-q or hitting control-d will end the session). The actual session will look something like this:

~/code/connectors$ psql -U $USER -h localhost -p 5432 postgres
psql (14.2)
Type "help" for help.

postgres=# create database inventory;
CREATE DATABASE

postgres=# create user happy with encrypted password 'HAPPY_PASSWORD';
CREATE ROLE

postgres=# alter  user happy with encrypted password 'HAPPY_PASSWORD';
ALTER ROLE

postgres=# grant all privileges on database inventory   to happy;
GRANT

postgres=# \q

Now run Beekeeper Studio, enter your connection details, set up the database tables and import your data.

Running the Postgres connector

Create the pg_source_brew.txt file given below, and edit the sections it instructs. Kill any other running connect-standalone sessions and run

connect-standalone /usr/local/etc/kafka/connect-standalone.properties ./pg_source_brew.properties ./file_source_brew.properties

(this will run both the Postgres consumer and the File source consumer just as a sanity check; going forward you don't need to run the latter).

# based on /usr/local/etc/kafka/connect-file-source.properties
name = file_source_brew
connector.class = FileStreamSource
tasks.max = 1
file = /tmp/file_source_testfile.txt
topic = file_source_brew
### Full records dumped:
To see the full before-and-after of each database change, run this for each table:
```
ALTER TABLE "table1" REPLICA IDENTITY full;
ALTER TABLE "other_table" REPLICA IDENTITY full;
ALTER TABLE "yet_another_table" REPLICA IDENTITY full;
```
# Tested with debezium 1.8.1-Final and postgres 14 using homebrew
# Name of this connector in kafka &co.
name = pg_source_brew
connector.class = io.debezium.connector.postgresql.PostgresConnector
# This is the strategy used to follow the logs and not the name for something
plugin.name = pgoutput
tasks.max = 1
#
# There is a very good chance that you should change the database.user entry to
# your local username -- running `echo $USER` will show you what to enter here.
# If you have any connection problems, try running
# `psql -U your_username -h your_hostname -p your_portname your_databasename`
# and first confront any issues it reports.
#
database.user = happy
database.password = HAPPY_PASSWORD
database.hostname = localhost
database.port = 5432
#
# Change these to the actual database name and table name(s) you are using
database.dbname = inventory
database.server.name = inventory
database.include.list = products,locations,product_locations
#
# Name of the slot tracking the point in the postgres logs to read from
slot.name = pg_source_brew
#
# Change this to false in production -- it resets history on restart.
slot.drop.on.stop = true
#
# The regex to match against postgres' `database.schema.table` syntax, edit to your preference:
transforms = route
transforms.route.type = org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex = ([^.]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement = $1.brew.$3

Weird things that might happen

  • Socket timeout very early in making the connector go:

  • if you wish to connect on localhost, you may need to edit /usr/local/etc/kafka/server.properties and add the line listeners=PLAINTEXT://localhost:9092 in the "Socket Server" section. The troubleshooting section below tells you

  • To find your local ip, Ipconfig getifaddr en0 for wifi, or Ipconfig getifaddr en1 for a wired port might give the answer; otherwise visit the System Preferences / Networking tab to find it.

Blocked mirror for repositories while building a connector from source:

[ERROR] Failed to execute goal org.apache.maven.plugins:... Execution validate of .... Could not transfer artifact io.confluent:build-tools:pom:6.0.3 from/to maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: [confluent (http://packages.confluent.io/maven/, default, releases+snapshots)] -> [Help 1]

This might be because confluent maven repo is http, not https, tripping up a well-intentioned security feature. Edit the fille /usr/local/Cellar/maven/*/libexec/conf/settings.xml (where * is the version) and comment out the section for maven-default-http-blocker (from the immediately preceding to its matching :

<!--
        <mirror>
            <id>maven-default-http-blocker</id>
            <mirrorOf>external:dummy:*</mirrorOf>
            <name>Pseudo repository to mirror external repositories initially using HTTP.</name>
            <url>http://0.0.0.0/</url>
            <blocked>true</blocked>
        </mirror>
-->

Debezium won't compile because of JDK version

Maven is discovering the Macos system JDK, which is way old. To use the brew installed JDK:

  • ls /usr/local/Cellar/openjdk/ and note the version number.
  • run export JAVA_HOME=/usr/local/Cellar/openjdk/17.0.2/libexec/openjdk.jdk/Contents/Home but with your version number replacing 17.0.2
  • run `export PATH="/usr/local/opt/openjdk/bin:$PATH"

Kafka Sanity Checks

To view the kafka logs, open a dedicated terminal tab and run: tail -f /usr/local/var/log/kafka/kafka_output.log. Running brew services restart kafka should produces a lot of logs but ultimately the following:

[2022-03-16 15:59:03,563] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-03-16 15:59:04,062] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2022-03-16 15:59:04,325] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2022-03-16 15:59:04,333] INFO starting (kafka.server.KafkaServer)
[2022-03-16 15:59:04,333] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2022-03-16 15:59:04,381] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2022-03-16 15:59:04,413] INFO Client environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT (org.apache.zookeeper.ZooKeeper)
[2022-03-16 15:59:04,414] INFO Client environment:host.name=quicktempo (org.apache.zookeeper.ZooKeeper)
[2022-03-16 15:59:04,414] INFO Client environment:java.version=17.0.2 (org.apache.zookeeper.ZooKeeper)
[2022-03-16 15:59:04,414] INFO Client environment:java.vendor=Homebrew (org.apache.zookeeper.ZooKeeper)
...
[2022-03-16 15:59:04,461] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2022-03-16 15:59:04,466] INFO Opening socket connection to server localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn)
[2022-03-16 15:59:04,467] INFO SASL config status: Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2022-03-16 15:59:04,480] INFO Socket connection established, initiating session, client: /[0:0:0:0:0:0:0:1]:65460, server: localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn)
[2022-03-16 15:59:04,720] INFO Session establishment complete on server localhost/[0:0:0:0:0:0:0:1]:2181, session id = 0x10023842cd30005, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2022-03-16 15:59:04,730] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2022-03-16 15:59:05,966] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2022-03-16 15:59:06,529] INFO Updated cache from existing <empty> to latest FinalizedFeaturesAndEpoch(features=Features{}, epoch=0). (kafka.server.FinalizedFeatureCache)
[2022-03-16 15:59:06,619] INFO Cluster ID = SQtIfs4ZQpK2qc7Mvhy4IQ (kafka.server.KafkaServer)
... many config files fly by...
[2022-03-16 15:59:08,212] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-03-16 15:59:08,212] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-03-16 15:59:08,213] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-03-16 15:59:08,214] INFO [ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2022-03-16 15:59:08,876] INFO Loading logs from log dirs ArraySeq(/usr/local/var/lib/kafka-logs) (kafka.log.LogManager)
[2022-03-16 15:59:08,882] INFO Attempting recovery for all logs in /usr/local/var/lib/kafka-logs since no clean shutdown file was found (kafka.log.LogManager)
... this means things are good:
[2022-03-16 15:59:09,075] INFO [LogLoader partition=some.happy.topic-0, dir=/usr/local/var/lib/kafka-logs] Recovering unflushed segment 0 (kafka.log.LogLoader$)

Checking connectivity

Run telnet localhost 9092 (or replace localhost with whatever you are trying to connect to)

  • telnet: Unable to connect to remote host means it is not running on that port and interface
  • Connected to localhost means it is working; hit enter twice to move to the next level.

Healthy startup logs look something like this:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment