Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save uuhnaut69/a09ce6c014c23dcd6ace0a7024a5a332 to your computer and use it in GitHub Desktop.
Save uuhnaut69/a09ce6c014c23dcd6ace0a7024a5a332 to your computer and use it in GitHub Desktop.
Debezium installation procedure for PostgreSQL without docker

Debezium installation

1. Overview

There are primarily two ways of setting up the kafka connect environment and installing the Debezium connectors, either the manually installation or using the Debezium Docker images. At this document, the manual installation procedure is described.

Installation environment requirements:

ℹ️

Note that Java 8 or later is required to run the Debezium connectors.

The installation procedure is performed in two parts:

2. Installing Logical Decoding plugin

Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.

As of PostgreSQL 9.4, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements. In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database. The output plugins transform the data from the write-ahead log’s internal representation into the format the consumer of a replication slot desires. Plugins are written in C, compiled, and installed on the machine which runs the PostgreSQL server, and they use a number of PostgreSQL specific APIs, as described by the PostgreSQL documentation.

Debezium’s PostgreSQL connector works with one of Debezium’s supported logical decoding plugin, protobuf or wal2json, to encode the changes in either Protobuf format or JSON format respectively.

⚠️

The Debezium logical decoding plugins have only been installed and tested on Linux machines. For Windows and other OSes it may require different installation steps

⚠️

wal2json limitations

  • wal2json plug-in is not able to process qouted identifiers (issue)

  • wal2json plug-in does not emit events for tables without primary keys

  • wal2json plug-in does not support special values (NaN or infinity) for floating point types

ℹ️

More information about the logical decoding and output plugins can be found at:

2.1 Installation

At the current installation example, the wal2json output plugin for logical decoding is used. The wal2json output plugin produces a JSON object per transaction. All of the new/old tuples are available in the JSON object. The plugin compilation and installation is performed by executing the related commands extracted from the Debezium docker image file.

Before executing the commands, make sure that the user has the privileges to write the wal2json library at the PostgreSQL lib directory (at the test environment, the directory is: /usr/pgsql-9.6/lib/). Also note that the installation process requires the PostgreSQL utility pg_config. Verify that the PATH environment variable is set so as the utility can be found. If not, update the PATH environment variable appropriately. For example at the test environment:

export PATH="$PATH:/usr/pgsql-9.6/bin"
wal2json installation commands
$ git clone https://github.com/eulerto/wal2json -b master --single-branch \
&& cd wal2json \
&& git checkout d2b7fef021c46e0d429f2c1768de361069e58696 \
&& make && make install \
&& cd .. \
&& rm -rf wal2json
wal2json installation output
Cloning into 'wal2json'...
remote: Counting objects: 445, done.
remote: Total 445 (delta 0), reused 0 (delta 0), pack-reused 445
Receiving objects: 100% (445/445), 180.70 KiB | 0 bytes/s, done.
Resolving deltas: 100% (317/317), done.
Note: checking out 'd2b7fef021c46e0d429f2c1768de361069e58696'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by performing another checkout.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -b with the checkout command again. Example:

  git checkout -b new_branch_name

HEAD is now at d2b7fef... Improve style
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -fPIC -I. -I./ -I/usr/pgsql-9.6/include/server -I/usr/pgsql-9.6/include/internal -D_GNU_SOURCE -I/usr/include/libxml2  -I/usr/include  -c -o wal2json.o wal2json.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -fPIC -L/usr/pgsql-9.6/lib -Wl,--as-needed  -L/usr/lib64 -Wl,--as-needed -Wl,-rpath,'/usr/pgsql-9.6/lib',--enable-new-dtags  -shared -o wal2json.so wal2json.o
/usr/bin/mkdir -p '/usr/pgsql-9.6/lib'
/usr/bin/install -c -m 755  wal2json.so '/usr/pgsql-9.6/lib/'

2.2 PostgreSQL server configuration

Once the wal2json plugin has been installed, the database server should be configured.

Setting up libraries, WAL and replication parameters

Add the following lines at the end of the postgresql.conf PostgreSQL configuration file in order to include the plugin at the shared libraries and to adjust some WAL and steaming replication settings. The configuration is extracted from postgresql.conf.sample. You may need to modify it, if for example you have additionally installed shared_preload_libraries.

postgresql.conf , configuration file parameters settings
############ REPLICATION ##############
# MODULES
shared_preload_libraries = 'wal2json'   # load 'wal2json' plugin (the plugin name is set at makefiles)

# REPLICATION
wal_level = logical                     # use logical decoding with the write-ahead log
max_wal_senders = 4                     # max number of separate processes for processing WAL changes
max_replication_slots = 4               # max number of replication slots to be created for streaming WAL changes

Debezium needs a PostgreSQL’s WAL to be kept during Debezium outages. If your WAL retention is too small and outages too long then Debezium will not be able to recover after restart as it will miss part of the data changes. The usual indicator is an error similar to this thrown during the startup: ERROR: requested WAL segment 000000010000000000000001 has already been removed.

When this happens then it is necessary to re-execute the snapshot of the database. It is also recommend to set parameter wal_keep_segments = 0. Please follow PostgreSQL offical documentation for fine-tuning of WAL retention.

💡

It is strongly recommend reading and understanding the official documentation regarding the mechanics and configuration of the PostgreSQL write-ahead log

Setting up replication permissions

Replication can only be performed by a database user that has appropriate permissions and only for a configured number of hosts. In order to give a user replication permissions, define a PostgreSQL role that has at least the REPLICATION and LOGIN permissions. For example:

CREATE ROLE name REPLICATION LOGIN;
💡

Superusers have by default both of the above roles.

Add the following lines at the end of the pg_hba.conf PostgreSQL configuration file, so as to configure the client authentication for the database replication. The PostgreSQL server should allow replication to take place between the server machine and the host on which the Debezium PostgreSQL connector is running.

Note that the authentication refers to the database superuser postgres. You may change this accordingly, if some other user with REPLICATION and LOGIN permissions has been created.

pg_hba.conf , configuration file parameters settings
############ REPLICATION ##############
local   replication     postgres                          trust		# allow replication for `postgres` locally
host    replication     postgres  127.0.0.1/32            trust		# receive replication changes using `IPV4`
host    replication     postgres  ::1/128                 trust		# receive replication changes using `IPV6`
💡

See the PostgreSQL documentation for more information on network masks.

2.3 Database test environment setup

For the testing purposes, a database named test with a table named test_table are created with the following DDL commands:

Database SQL commands for test database/table creation
CREATE DATABASE test;

CREATE TABLE test_table (
    id char(10) NOT NULL,
    code        char(10),
    PRIMARY KEY (id)
);

2.4 Decoding output plugin test

Test that the wal2json is working properly by obtaining the test_table changes using the pg_recvlogical PostgreSQL client application that controls PostgreSQL logical decoding streams.

Before starting make sure that you have login as the user with database replication permissions, as configured at a previous step. Otherwise, the slot creation and streaming fails with the following error message:

pg_recvlogical: could not connect to server: FATAL:  no pg_hba.conf entry for replication connection from host "[local]", user "root", SSL off

At the test environment, the user with replication permission is the postgres.

Also, make sure that the PATH environment variable is set so as the pg_recvlogical can be found. If not, update the PATH environment variable appropriately. For example at the test environment:

export PATH="$PATH:/usr/pgsql-9.6/bin"
  • Create a slot named test_slot for the database named test, using the logical output plugin wal2json

$ pg_recvlogical -d test --slot test_slot --create-slot -P wal2json
  • Begin streaming changes from the logical replication slot test_slot for the database test

$ pg_recvlogical -d test --slot test_slot --start -o pretty-print=1 -f -
  • Perform the basic DML operations at test_table to trigger INSERT/UPDATE/DELETE change events

Interactive PostgreSQL terminal, SQL commands
test=# INSERT INTO test_table (id, code) VALUES('id1', 'code1');
INSERT 0 1
test=# update test_table set code='code2' where id='id1';
UPDATE 1
test=# delete from test_table where id='id1';
DELETE 1

Upon the INSERT, UPDATE and DELETE events, the wal2json outputs the table changes as captured by pg_recvlogical.

Output for INSERT event
{
  "change": [
    {
      "kind": "insert",
      "schema": "public",
      "table": "test_table",
      "columnnames": ["id", "code"],
      "columntypes": ["character(10)", "character(10)"],
      "columnvalues": ["id1       ", "code1     "]
    }
  ]
}
Output for UPDATE event
{
  "change": [
    {
      "kind": "update",
      "schema": "public",
      "table": "test_table",
      "columnnames": ["id", "code"],
      "columntypes": ["character(10)", "character(10)"],
      "columnvalues": ["id1       ", "code2     "],
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["character(10)"],
        "keyvalues": ["id1       "]
      }
    }
  ]
}
Output for DELETE event
{
  "change": [
    {
      "kind": "delete",
      "schema": "public",
      "table": "test_table",
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["character(10)"],
        "keyvalues": ["id1       "]
      }
    }
  ]
}
💡

Note that the REPLICA IDENTITY of the table test_table is set to DEFAULT.

When the test is finished, the slot test_slot for the database test can be removed by the following command:

$ pg_recvlogical -d test --slot test_slot --drop-slot
ℹ️

REPLICA IDENTITY, is a PostgreSQL specific table-level setting which determines the amount of information that is available to logical decoding in case of UPDATE and DELETE events. There are four possible values: DEFAULT , NOTHING, FULL and INDEX.

You can set the replica identity to FULL for a table by executing the following SQL command:

ALTER TABLE test_table REPLICA IDENTITY FULL;
test=# \d+ test_table
                         Table "public.test_table"
 Column |     Type      | Modifiers | Storage  | Stats target | Description
 -------+---------------+-----------+----------+--------------+------------
 id     | character(10) | not null  | extended |              |
 code   | character(10) |           | extended |              |
Indexes:
    "test_table_pkey" PRIMARY KEY, btree (id)
Replica Identity: FULL

Here is the output of wal2json plugin on UPDATE event and REPLICA IDENTITY set to FULL. Compare with the respective output when REPLICA IDENTITY is set to DEFAULT.

Output for `UPDATE`
{
  "change": [
    {
      "kind": "update",
      "schema": "public",
      "table": "test_table",
      "columnnames": ["id", "code"],
      "columntypes": ["character(10)", "character(10)"],
      "columnvalues": ["id1       ", "code2     "],
      "oldkeys": {
        "keynames": ["id", "code"],
        "keytypes": ["character(10)", "character(10)"],
        "keyvalues": ["id1       ", "code1     "]
      }
    }
  ]
}

3. Installing Kafka Connect environment

The Kafka Connect environment comes with two flavors, as a kafka installation or as a a part of the Confluent platform installation. In any case, if you’ve already installed Zookeeper, Kafka, and Kafka Connect, then using one of Debezium’s connectors is easy. Simply download one or more connector plugin archives, extract their files into your Kafka Connect environment, and add the parent directory of the extracted plugin(s) to Kafka Connect’s plugin path.

Kafka installation

Download the Kafka archive (version kafka_2.12-2.1.1 is used) and extract it into a directory (e.g. /opt)

$ tar xzf kafka_2.12-2.1.1.tgz -C /opt
ℹ️
In the rest of the document, the Kafka installation directory is referred as <KAFKA_INST>.

Confluent Platform installation

Download the Confluent Platform (community version 5.1.2-2.11 is used) and extract it into a directory (e.g. /opt) as described at manual Install using ZIP and TAR Archives

$ curl -O http://packages.confluent.io/archive/5.1/confluent-community-5.1.2-2.11.tar.gz
$ tar xzf confluent-community-5.1.2-2.11.tar.gz -C /opt
ℹ️
In the rest of the document, the Confluent installation directory is referred as <CONFLUENT_INST>.

3.1 Installing Connectors

Before starting the installation, the Kafka Connect worker’s plugin path should be set. The plugin path is a comma-separated list of paths to directories that contain Kafka Connect plugins. It is defined at the Kafka Connect configuration file via the plugin.path parameter.

The configuration file has two flavors depending on the Kafka Connect execution mode, the standalone and the distributed. At the standalone mode, the connect-standalone.properties file is used, whereas at the distributed, the connect-distributed.properties file is consider. The files are located under the directories <KAFKA_INST>/config and <CONFLUENT_INST>/etc/kafka for the plain Kafka Connect and the Confluent Platform installations respectively.

In order to show both standalone and distributed execution modes, the plain Kafka Connect will be started at standalone mode whereas the Confluent Platform will be started at distributed mode (which is the default one).

ℹ️

More information about Kafka Connectors can be found at:

3.1.1 Plain Kafka Connect

  • Open the <KAFKA_INST>/config/connect-standalone.properties and set the plugin.path parameter appropriately.

  • Download the Postgres Connector and extract it under the Kafka Connect worker’s plugin path as defined previously.

3.1.2 Using Confluent Platform

  • Open the <CONFLUENT_INST>/etc/kafka/connect-distributed.properties and set the plugin.path parameter appropriately (the default value of the parameter points to <CONFLUENT_INST>/share/java).

  • Download the Postgres Connector and extract it under the Kafka Connect worker’s plugin path as defined previously.

3.2 Configuring and Deploying Connectors

Connector configurations are key-value mappings used to set up connectors. For standalone mode, these are defined in a properties file and passed to the Connect process on the command line. In distributed mode, they will be included in the JSON payload sent over the REST API.

Generally, the Debezium configuration file includes parameters related to

  • the database connectivity (database.hostname, database.port, database.user, database.password),

  • the kafka message key and value format (key.converter, value.converter),

  • what data should be included at the kafka message (key.converter.schemas.enable, value.converter.schemas.enable),

  • the message structure (Event Flattening).

Here are some parameters with their values used in the current example, you can modify the values according to your needs:

  • name : dbz-test-connector, the logical name of the connector

  • connector.class : io.debezium.connector.postgresql.PostgresConnector, the Debezium Postgresql connector class

  • plugin.name : wal2json, the used logical decoding output plugin

  • key.converter : org.apache.kafka.connect.json.JsonConverter, the appropriate converter to serialize kafka message key as JSON

  • value.converter : org.apache.kafka.connect.json.JsonConverter, the appropriate converter to serialize kafka message value as JSON

  • database.dbname : test, the name of the PostgreSQL database from which to stream the changes

  • database.server.name : DBTestServer, the logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored

💡

Before starting, it is recommended to have installed the jq. It is a lightweight and flexible command-line JSON processor like sed for JSON data. You can use it to slice, filter, map and transform structured data as you can see at the tutorial.

3.2.1 Plain Kafka Connect

At the current example, the following configuration for the Debezium Connector is used. Modify the parameter values, if needed, and save the configuration into a file (e.g. <KAFKA_INST>/config/dbz-test-connector.properties).

Plain kafka Connect, Debezium Connector’s configuration
name=dbz-test-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=wal2json
database.hostname=localhost
database.port=5432
database.user=postgres
database.password=password
database.dbname =test
database.server.name=DBTestServer
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
  • Start the zookeeper server

$ cd <KAFKA_INST>
$ bin/zookeeper-server-start.sh config/zookeeper.properties
  • Start the Kafka server

$ bin/kafka-server-start.sh config/server.properties
  • Deploy the Debezium connector

$ bin/connect-standalone.sh config/connect-standalone.properties config/dbz-test-connector.properties
  • Check the Debezium connector status

$ curl -s localhost:8083/connectors/dbz-test-connector/status | jq
Debezium Connector’s status output
{
  "name": "dbz-test-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "127.0.0.1:8083"
    }
  ],
  "type": "source"
}
ℹ️
More information about Kafka Rest API can be found at the official documentation.

3.2.2 Using Confluent Platform

In case of using the Confluent Platform, the configuration parameters of the Debezium Connector is the same as the deployment at the plain Kafka Connect. The only difference is that the parameters are represented in JSON format. Modify the parameter values, if needed, and save the configuration into a file (e.g. <CONFLUENT_INST>/etc/kafka/dbz-test-connector.json).

Confluent Platform, Debezium Connector’s configuration
{
  "name": "dbz-test-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "plugin.name": "wal2json",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "password",
    "database.dbname" : "test",
    "database.server.name": "DBTestServer",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "value.converter.schemas.enable": "false"
  }
}

Use the Confluent CLI to start the Confluent platform services and load the Debezium Connector.

  • Start Confluent Platform services

$ cd <CONFLUENT_INST>
$ bin/confluent start
  • Deploy the Debezium Connector

$ bin/confluent load dbz-test-connector -d <CONFLUENT_INST>/etc/kafka/dbz-test-connector.json
Debezium Connector’s deployment output
{
  "name": "dbz-test-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "plugin.name": "wal2json",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "password",
    "database.dbname": "test",
    "database.server.name": "DBTestServer",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "name": "dbz-test-connector"
  },
  "tasks": [],
  "type": null
}
  • Check the Debezium connector status

$ bin/confluent status dbz-test-connector
Debezium Connector’s status output
{
  "name": "dbz-test-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "127.0.0.1:8083"
    }
  ],
  "type": "source"
}
ℹ️
More information about Confluent CLI can be found at the official documentation.

3.3 Testing Connectors

The Debezium PostgreSQL connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. The name of the Kafka topics takes by default the form serverName.schemaName.tableName, where serverName is the logical name of the connector as specified with the database.server.name configuration property, schemaName is the name of the database schema where the operation occurred, and tableName is the name of the database table on which the operation occurred. In our case, the name of the created kafka topic is DBTestServer.public.test_table.

Most PostgreSQL servers are configured to not retain the complete history of the database in the WAL segments, so the PostgreSQL connector would be unable to see the entire history of the database by simply reading the WAL. So, by default the connector will upon first startup perform an initial consistent snapshot of the database.

💡

For the needs of the tests, it is recommended to use the kafkacat, a command line utility that helps to test and debug Apache Kafka deployments. It can be used to produce, consume, and list topic and partition information for Kafka. You can download the latest version and installed it by following the instructions described at the documentation.

In the rest of the document, the kafkacat installation directory is referred as <KAFKACAT_INST>.

In order to check if the Debezium connector works as expected the following tests can be performed:

  • Check the topic(s) creation for the database table(s)

Verify the creation of kafka topics for the tables that the connector is applied for (test_table in our example)

$ <KAFKACAT_INST>/kafkacat -b localhost:9092 -L | grep DBTestServer

Alternatively the kafka-topics Kafka command line tool can be used for the plain Kafka Connect and Confluent Platform deployments as follows:

Confluent Platform, kafka-topics command
$ <KAFKA_INST>/bin/kafka-topics.sh --list --zookeeper localhost:2181 | grep DBTestServer
Plain Kafka Connector, kafka-topics command
$ <CONFLUENT_INST>/bin/kafka-topics --list --zookeeper localhost:2181 | grep DBTestServer

The output of the above command should include a topic named DBTestServer.public.test_table.

  • Check the initial topic(s) content

Check the Kafka topic messages (the DBTestServer.public.test_table topic in our case) of the respective table, an initial snapshot of the database should be contained (the output is formatted in order to be more readable)

$ <KAFKACAT_INST>/kafkacat -b localhost:9092 -t DBTestServer.public.test_table -C -o beginning -f 'Key: %k\nValue: %s\n'
Key: {"id":"id1       "}
Value: {
	"before":null,
	"after":{"id":"id1       ","code":"code2     "},
	"source":{
		"version":"0.9.2",
		"name":"DBTestServer",
		"db":"test",
		"ts_usec":1537191190816000,
		"txId":934261,
		"lsn":3323094832,
		"schema":"public",
		"table":"test_table",
		"snapshot":true,
		"last_snapshot_record":false},
	"op":"r",
	"ts_ms":1537191190817
}
% Reached end of topic DBTestServer.public.test_table [0] at offset 1

Alternatively the kafka-console-consumer Kafka command line tool can be used, for the plain Kafka Connect and Confluent Platform deployments as follows:

Plain Kafka Connector, kafka-console-consumer command
$ <KAFKA_INST>/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic DBTestServer.public.test_table --from-beginning --property print.key=true
Confluent Platform, kafka-console-consumer command
$ <CONFLUENT_INST>/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic DBTestServer.public.test_table --from-beginning --property print.key=true

Indeed, the connect took an initial database snapshot (the test_table contains only one record)

  • Monitor the kafka messages produced on table(s) changes

Monitor the messages added at the kafka topic when the respective table changes (e.g. on insert/update and deletion record)

$ <KAFKACAT_INST>/kafkacat -b localhost:9092 -t DBTestServer.public.test_table -C -o beginning -f 'Key: %k\nValue: %s\n'

Alternatively the kafka-console-consumer Kafka command line tool can be used as described previously.

Here are the DML operations at the test_table and the respective kafka messages added at DBTestServer.public.test_table topic (the messages are formatted in order to be more readable)

Insert a record
test=# INSERT INTO test_table (id, code) VALUES('id2', 'code2');
Insert a record - Kafka message
Key: {"id":"id2       "}
Value: {
	"before":null,
	"after":{"id":"id2       ","code":"code2     "},
	"source":{
		"version":"0.9.2",
		"name":"DBTestServer",
		"db":"test",
		"ts_usec":1537262994443180000,
		"txId":934262,
		"lsn":3323107556,
		"schema":"public",
		"table":"test_table",
		"snapshot":true,
		"last_snapshot_record":true},
	"op":"c",
	"ts_ms":1537262994604
}
Update a record
test=# update test_table set code='code3' where id='id2';
Update a record - Kafka message
Key: {"id":"id2       "}
Value: {
	"before":{"id":"id2       ","code":null},
	"after":{"id":"id2       ","code":"code3     "},
	"source":{
		"version":"0.9.2",
		"name":"DBTestServer",
		"db":"test",
		"ts_usec":1537263061440799000,
		"txId":934263,
		"lsn":3323108190,
		"schema":"public",
		"table":"test_table",
		"snapshot":true,
		"last_snapshot_record":true},
	"op":"u",
	"ts_ms":1537263061474
}
Delete a record
test=# delete from test_table where id='id2';
Delete a record - Kafka message
Key: {"id":"id2       "}
Value: {
	"before":{"id":"id2       ","code":null},
	"after":null,
	"source":{
		"version":"0.9.2",
		"name":"DBTestServer",
		"db":"test",
		"ts_usec":1537263155358693000,
		"txId":934264,
		"lsn":3323108208,
		"schema":"public",
		"table":"test_table",
		"snapshot":true,
		"last_snapshot_record":true},
	"op":"d",
	"ts_ms":1537263155374}
An extra message is added when a record is deleted, the tombstone message
Key: {"id":"id2       "}
Value:

Debezium’s PostgreSQL connector always follows the delete event with a special tombstone event that has the same key but null value in order to remove all messages with same key during kafka log compaction. This behavior can be controled via the connector parameter tombstones.on.delete.

4. Resources

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment