- Install all the things:
brew install kafka zookeeper maven openjdk
- Make a directory for plugins:
mkdir -p /usr/local/share/kafka/plugins
- Edit the file
/usr/local/etc/kafka/connect-standalone.properties
and uncomment the line setting up a plugin path:plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
- Start all the things:
brew services start zookeeper
andbrew services start kafka
- Make a file named
file_source_brew.properties
as shown below - Run
echo "Hello world" > /tmp/file_source_testfile.txt
to create the file we'll use as a source - Run
connect-standalone /usr/local/etc/kafka/connect-standalone.properties ./file_source_brew.properties
. See below for the key milestones that should appear in the logs. - Running
kafka-topics --list --bootstrap-server localhost:9092
should show that thefile_source_brew
topic has been created. - To test,
- Run
kafka-console-consumer --bootstrap-server localhost:9092 --topic file_source_brew
in one terminal tab - In another, run
cat >> /tmp/file_source_testfile.txt
. - Anything you type into the cat should be echoed by the consumer you just started.
- Run
Added plugin '...
-- make sure the plugins you are trying to run show up here; in this case,org.apache.kafka.connect.file.FileStreamSourceConnector
.- The config file shows the kafka server it's connecting to:
bootstrap.servers = [localhost:9092]
- The lines
Kafka cluster ID: Gobbledyjumble
up throughINFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
show that you have connected and that the basic connect stuff is working Instantiated connector file_source_brew with version
-- the plugin has been foundCreating task file_source_brew-
-- and startedCreated connector file_source_brew (org.apache.kafka.connect.cli.ConnectStandalone:109)
-- the topic has been createdError while fetching metadata with correlation id 3 : {connect-test=LEADER_NOT_AVAILABLE
-- ignore this, it's teething pains as the worker figures itself outINFO [file_source_brew|task-0] [Producer clientId=connector-producer-file_source_brew-0] Resetting the last seen epoch
is good news.[file_source_brew|task-0|offsets] WorkerSourceTask{id=file_source_brew-0} Either no records were produced by the task since the last offset commit...
is good news, it reached the end of the file.
- Download the postgres connector; these instructions are tested against Debezium v1.8. Get the Postgres connector and copy it to a convenient directory -- for this example, we'll work in
$HOME/code/connectors
mkdir -p $HOME/code/connectors
cd $HOME/code/connectors
open .
# (copy the file from your downloads folder to the one we just made)
tar xvzf debezium-connector-postgres-1.8.1.Final-plugin.tar.gz
You should now see a folder named debezium-connector-postgres
containing several fies, one of them named something like debezium-connector-postgres-1.8.1.Final.jar
. To enable the plugin, make a symlink from your connector directories (the immediate parent folder holding the jars) to a directory in /usr/local/share/kafka/plugins
:
ln -snf $HOME/code/connectors/debezium-connector-postgres /usr/local/share/kafka/plugins/debezium-connector-postgres`
If you re-run the file connector example, you should see it in the mix:
[2022-03-16 14:41:02,870] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:77)
[2022-03-16 14:41:02,889] INFO Loading plugin from: /usr/local/share/kafka/plugins/debezium-connector-postgres (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:265)
[2022-03-16 14:41:03,437] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/local/share/kafka/plugins/debezium-connector-postgres/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:288)
[2022-03-16 14:41:03,438] INFO Added plugin 'io.debezium.connector.postgresql.PostgresConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
[2022-03-16 14:41:03,438] INFO Added plugin 'io.debezium.converters.ByteBufferConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
[2022-03-16 14:41:03,438] INFO Added plugin 'io.debezium.converters.CloudEventsConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
[2022-03-16 14:41:03,438] INFO Added plugin 'io.debezium.transforms.ExtractNewRecordState' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
brew install postgres
Edit /usr/local/var/postgres/postgresql.conf
and uncomment the following line:
wal_level = logical # minimal, replica, or logical
Start/restart the server:
brew services restart postgresql
You can skip all this if you already have a running loaded database. For a graphical interface to postgres, download and install Beekeeper Studio (https://www.beekeeperstudio.io/). However, first we must set up a user and database.
Start the postgres shell:
psql -U $USER -h localhost -p 5432
(This assumes your postgres root user is named for your system username, which is the homebrew default. If that doesn't work, try psql -U postgres -h localhost -p 5432
).
Run the following commands:
create database inventory;
create user happy with encrypted password 'HAPPY_PASSWORD';
alter user happy with encrypted password 'HAPPY_PASSWORD';
grant all privileges on database inventory to happy;
\q
(Typing backslash-q or hitting control-d will end the session). The actual session will look something like this:
~/code/connectors$ psql -U $USER -h localhost -p 5432 postgres
psql (14.2)
Type "help" for help.
postgres=# create database inventory;
CREATE DATABASE
postgres=# create user happy with encrypted password 'HAPPY_PASSWORD';
CREATE ROLE
postgres=# alter user happy with encrypted password 'HAPPY_PASSWORD';
ALTER ROLE
postgres=# grant all privileges on database inventory to happy;
GRANT
postgres=# \q
Now run Beekeeper Studio, enter your connection details, set up the database tables and import your data.
Create the pg_source_brew.txt
file given below, and edit the sections it instructs. Kill any other running connect-standalone sessions and run
connect-standalone /usr/local/etc/kafka/connect-standalone.properties ./pg_source_brew.properties ./file_source_brew.properties
(this will run both the Postgres consumer and the File source consumer just as a sanity check; going forward you don't need to run the latter).