Skip to content

Instantly share code, notes, and snippets.

@tilakpatidar
Last active August 27, 2020 22:31
Show Gist options
  • Save tilakpatidar/df43db1e1636d1a957f6f15ddc394ac7 to your computer and use it in GitHub Desktop.
Save tilakpatidar/df43db1e1636d1a957f6f15ddc394ac7 to your computer and use it in GitHub Desktop.
Postgres to Kafka streaming using debezium
# Run postgres instance
docker run --name postgres -p 5000:5432 debezium/postgres
# Run zookeeper instance
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
# Run kafka instance
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka
# Run kafka connect
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link postgres:postgres --link kafka:kafka debezium/connect
# Open psql console
psql -h localhost -p 5000 -U postgres
CREATE DATABASE inventory;
CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR);
# Create connector using kafka connect
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "inventory",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}' | jq
# Verify created
curl -H "Accept:application/json" localhost:8083/connectors/ | jq
# Verify configuration
curl -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector | jq
# Start a console viewer on kafka
docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka watch-topic -a -k dbserver1.public.dumb_table
# Verify the existence of replication slot in postgres
SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
@shchokshi
Copy link

shchokshi commented Apr 17, 2019

How would step 4 (#Run kafka connect - terminal1) look like if I want to link kafka connect to a AWS Postgres RDS rather than running a postgres instance via Docker? thanks

@kal72
Copy link

kal72 commented May 1, 2019

@theprimo thansk you, it's work for my problem

@ozgursolak
Copy link

I see the only error logs in terminal 1 , How can I solve this problem ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment