Last active
August 27, 2020 22:31
-
-
Save tilakpatidar/df43db1e1636d1a957f6f15ddc394ac7 to your computer and use it in GitHub Desktop.
Postgres to Kafka streaming using debezium
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Run postgres instance | |
docker run --name postgres -p 5000:5432 debezium/postgres | |
# Run zookeeper instance | |
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper | |
# Run kafka instance | |
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka | |
# Run kafka connect | |
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link postgres:postgres --link kafka:kafka debezium/connect | |
# Open psql console | |
psql -h localhost -p 5000 -U postgres | |
CREATE DATABASE inventory; | |
CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR); | |
# Create connector using kafka connect | |
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d ' | |
{ | |
"name": "inventory-connector", | |
"config": { | |
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", | |
"tasks.max": "1", | |
"database.hostname": "postgres", | |
"database.port": "5432", | |
"database.user": "postgres", | |
"database.password": "postgres", | |
"database.dbname" : "inventory", | |
"database.server.name": "dbserver1", | |
"database.whitelist": "inventory", | |
"database.history.kafka.bootstrap.servers": "kafka:9092", | |
"database.history.kafka.topic": "schema-changes.inventory" | |
} | |
}' | jq | |
# Verify created | |
curl -H "Accept:application/json" localhost:8083/connectors/ | jq | |
# Verify configuration | |
curl -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector | jq | |
# Start a console viewer on kafka | |
docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka watch-topic -a -k dbserver1.public.dumb_table | |
# Verify the existence of replication slot in postgres | |
SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; | |
How would step 4 (#Run kafka connect - terminal1) look like if I want to link kafka connect to a AWS Postgres RDS rather than running a postgres instance via Docker? thanks
@theprimo thansk you, it's work for my problem
I see the only error logs in terminal 1 , How can I solve this problem ?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Change data capture events, Postgres to Kafka using Debezium
Proper working commands, please follow the below sequence
#Install Docker Toolbox & Open 5 docker quickstart terminals
#Ensure all docker terminals should be connected to the same virtual machine
docker-machine ls
docker ps
#IP details for following commands wrto os
Windows - IP is the virtual machine ip
Linux - IP is the respective container ip
#Run postgres instance - terminal1
docker run --name postgres -p 5000:5432 debezium/postgres
#After, proper db startup
ctrl+c
#Run zookeeper instance - terminal2
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
#Run kafka instance - terminal3
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka
#Run kafka connect - terminal4
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link postgres:postgres --link kafka:kafka debezium/connect
#Connect to psql console and create database - terminal1
docker exec -it POSTGRES-CONTAINER-ID bash;
psql -U postgres
\l
CREATE DATABASE inventory;
\c inventory
CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR);
#Create connector using kafka connect - terminal5
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IP:8083/connectors/ -d '
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "inventory",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}' | tac | tac
#Verify created - terminal5
curl -H "Accept:application/json" IP:8083/connectors/ | tac | tac
#Verify configuration - terminal5
curl -X GET -H "Accept:application/json" IP:8083/connectors/inventory-connector | tac | tac
#Start a console viewer on kafka - terminal5
docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka watch-topic -a -k dbserver1.public.dumb_table
#Insert records into table - terminal1
INSERT INTO dumb_table VALUES (10,'A');
INSERT INTO dumb_table VALUES (11,'B');
INSERT INTO dumb_table VALUES (12,'C');
#Observe change data events in terminal5