-
-
Save tilakpatidar/df43db1e1636d1a957f6f15ddc394ac7 to your computer and use it in GitHub Desktop.
# Run postgres instance | |
docker run --name postgres -p 5000:5432 debezium/postgres | |
# Run zookeeper instance | |
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper | |
# Run kafka instance | |
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka | |
# Run kafka connect | |
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link postgres:postgres --link kafka:kafka debezium/connect | |
# Open psql console | |
psql -h localhost -p 5000 -U postgres | |
CREATE DATABASE inventory; | |
CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR); | |
# Create connector using kafka connect | |
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d ' | |
{ | |
"name": "inventory-connector", | |
"config": { | |
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", | |
"tasks.max": "1", | |
"database.hostname": "postgres", | |
"database.port": "5432", | |
"database.user": "postgres", | |
"database.password": "postgres", | |
"database.dbname" : "inventory", | |
"database.server.name": "dbserver1", | |
"database.whitelist": "inventory", | |
"database.history.kafka.bootstrap.servers": "kafka:9092", | |
"database.history.kafka.topic": "schema-changes.inventory" | |
} | |
}' | jq | |
# Verify created | |
curl -H "Accept:application/json" localhost:8083/connectors/ | jq | |
# Verify configuration | |
curl -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector | jq | |
# Start a console viewer on kafka | |
docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka watch-topic -a -k dbserver1.public.dumb_table | |
# Verify the existence of replication slot in postgres | |
SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; | |
Change data capture events, Postgres to Kafka using Debezium
Proper working commands, please follow the below sequence
#Install Docker Toolbox & Open 5 docker quickstart terminals
#Ensure all docker terminals should be connected to the same virtual machine
docker-machine ls
docker ps
#IP details for following commands wrto os
Windows - IP is the virtual machine ip
Linux - IP is the respective container ip
#Run postgres instance - terminal1
docker run --name postgres -p 5000:5432 debezium/postgres
#After, proper db startup
ctrl+c
#Run zookeeper instance - terminal2
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
#Run kafka instance - terminal3
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka
#Run kafka connect - terminal4
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link postgres:postgres --link kafka:kafka debezium/connect
#Connect to psql console and create database - terminal1
docker exec -it POSTGRES-CONTAINER-ID bash;
psql -U postgres
\l
CREATE DATABASE inventory;
\c inventory
CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR);
#Create connector using kafka connect - terminal5
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IP:8083/connectors/ -d '
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "inventory",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}' | tac | tac
#Verify created - terminal5
curl -H "Accept:application/json" IP:8083/connectors/ | tac | tac
#Verify configuration - terminal5
curl -X GET -H "Accept:application/json" IP:8083/connectors/inventory-connector | tac | tac
#Start a console viewer on kafka - terminal5
docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka watch-topic -a -k dbserver1.public.dumb_table
#Insert records into table - terminal1
INSERT INTO dumb_table VALUES (10,'A');
INSERT INTO dumb_table VALUES (11,'B');
INSERT INTO dumb_table VALUES (12,'C');
#Observe change data events in terminal5
How would step 4 (#Run kafka connect - terminal1) look like if I want to link kafka connect to a AWS Postgres RDS rather than running a postgres instance via Docker? thanks
@theprimo thansk you, it's work for my problem
I see the only error logs in terminal 1 , How can I solve this problem ?
why the kafka consumer console can't see data? can you help me solve this problem? thank you!