Skip to content

Instantly share code, notes, and snippets.

@xiangjinwu
Last active April 23, 2024 11:19
Show Gist options
  • Save xiangjinwu/e879234981b65b08a074f27f6bab54ff to your computer and use it in GitHub Desktop.
Save xiangjinwu/e879234981b65b08a074f27f6bab54ff to your computer and use it in GitHub Desktop.
Debezium PostgreSQL `decimal.handling.mode=string`
# Modified from: https://github.com/risingwavelabs/risingwave/tree/main/integration_tests/debezium-postgres
# https://github.com/risingwavelabs/risingwave/blob/main/integration_tests/scripts/run_demos.py
docker compose up -d --build
docker compose exec postgres bash -c "psql postgresql://postgresuser:postgrespw@postgres:5432/mydb < postgres_prepare.sql"
curl -S -X PUT -H "Content-Type: application/json" http://localhost:8083/connectors/pg-default/config \
-d '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "pg",
"database.hostname": "postgres",
"database.port": 5432,
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname": "mydb",
"database.server.name": "postgres",
"database.schema": "public",
"database.history.kafka.bootstrap.servers": "message_queue:29092",
"database.history.kafka.topic": "postgres-history",
"time.precision.mode": "adaptive",
"decimal.handling.mode": "string",
"interval.handling.mode": "numeric",
"include.schema.changes": false,
"slot.name": "debezium_1",
"table.include.list": "public.tys"
}'
docker compose exec message_queue rpk topic consume pg.public.tys
---
version: "3"
services:
message_queue:
image: "docker.vectorized.io/vectorized/redpanda:latest"
command:
- redpanda
- start
- "--smp"
- "1"
- "--reserve-memory"
- 0M
- "--memory"
- 4G
- "--overprovisioned"
- "--node-id"
- "0"
- "--check=false"
- "--kafka-addr"
- "PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092"
- "--advertise-kafka-addr"
- "PLAINTEXT://message_queue:29092,OUTSIDE://localhost:9092"
expose:
- "29092"
- "9092"
- "9644"
ports:
- "29092:29092"
- "9092:9092"
- "9644:9644"
- "8081:8081"
depends_on: []
volumes:
- "message_queue:/var/lib/redpanda/data"
environment: {}
container_name: message_queue
healthcheck:
test: curl -f localhost:9644/v1/status/ready
interval: 1s
timeout: 5s
retries: 5
restart: always
postgres:
image: debezium/postgres:16-alpine
ports:
- 15432:5432
environment:
POSTGRES_PASSWORD: postgrespw
POSTGRES_USER: postgresuser
POSTGRES_DB: mydb
healthcheck:
test:
[
"CMD-SHELL",
"pg_isready -h 127.0.0.1 -U postgresuser -d mydb"
]
interval: 5s
timeout: 5s
retries: 5
container_name: postgres
volumes:
- ./postgres_bootstrap.sql:/docker-entrypoint-initdb.d/postgres_bootstrap.sql
- "./postgres_prepare.sql:/postgres_prepare.sql"
debezium:
image: debezium-connect
build: .
environment:
BOOTSTRAP_SERVERS: message_queue:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://message_queue:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://message_queue:8081
ports:
- 8083:8083
healthcheck:
test: curl -f localhost:8083
interval: 1s
start_period: 120s
depends_on:
message_queue: { condition: service_healthy }
postgres: { condition: service_healthy }
container_name: debezium
# Check out the connectors via 127.0.0.1:8000
# kafka-connect-ui:
# image: landoop/kafka-connect-ui
# platform: linux/amd64
# ports:
# - 8000:8000
# environment:
# CONNECT_URL: http://debezium:8083
# container_name: kafka-connect-ui
# depends_on:
# message_queue: { condition: service_healthy }
volumes:
message_queue:
external: false
name: risingwave-compose
FROM quay.io/debezium/connect:2.5.0.Final
ENV AVRO_PLUGION_DIR=$KAFKA_CONNECT_PLUGINS_DIR/confluent-avro-plugin
ARG CONFLUENT_PLUGIN_VERSION=5.3.0
RUN mkdir $AVRO_PLUGION_DIR && cd $AVRO_PLUGION_DIR && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/$CONFLUENT_PLUGIN_VERSION/kafka-connect-avro-converter-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-data/$CONFLUENT_PLUGIN_VERSION/kafka-connect-avro-data-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/common-config/$CONFLUENT_PLUGIN_VERSION/common-config-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/common-utils/$CONFLUENT_PLUGIN_VERSION/common-utils-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-schema-serializer/$CONFLUENT_PLUGIN_VERSION/kafka-schema-serializer-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/$CONFLUENT_PLUGIN_VERSION/kafka-schema-registry-client-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/$CONFLUENT_PLUGIN_VERSION/kafka-avro-serializer-$CONFLUENT_PLUGIN_VERSION.jar && \
curl -sO https://repo1.maven.org/maven2/com/google/guava/guava/31.0.1-jre/guava-31.0.1-jre.jar && \
curl -sO https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar && \
curl -sO https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar && \
curl -sO https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar
{
"topic": "pg.public.tys",
"value": "\u0000\u0000\u0000\u0000\u0001\u0000\u0002\u0002\u0006x00\u0002\u0006NAN\u00162.5.0.Final\u0014postgresql\u0004pg\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000\nfirst\u0008mydb\u0002\"[null,\"27581464\"]\u000cpublic\u0006tys\u0002\ufffd\u000b\u0002\ufffd\ufffd\ufffd\u001a\u0000\u0002r\u0002\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000",
"timestamp": 1713865801583,
"partition": 0,
"offset": 0
}
{
"topic": "pg.public.tys",
"value": "\u0000\u0000\u0000\u0000\u0001\u0000\u0002\u0002\u0006x01\u0002\"POSITIVE_INFINITY\u00162.5.0.Final\u0014postgresql\u0004pg\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000\u0008true\u0008mydb\u0002\"[null,\"27581464\"]\u000cpublic\u0006tys\u0002\ufffd\u000b\u0002\ufffd\ufffd\ufffd\u001a\u0000\u0002r\u0002\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000",
"timestamp": 1713865801589,
"partition": 0,
"offset": 1
}
{
"topic": "pg.public.tys",
"value": "\u0000\u0000\u0000\u0000\u0001\u0000\u0002\u0002\u0006x02\u0002\"NEGATIVE_INFINITY\u00162.5.0.Final\u0014postgresql\u0004pg\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000\u0008last\u0008mydb\u0002\"[null,\"27581464\"]\u000cpublic\u0006tys\u0002\ufffd\u000b\u0002\ufffd\ufffd\ufffd\u001a\u0000\u0002r\u0002\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000",
"timestamp": 1713865801589,
"partition": 0,
"offset": 2
}
-- https://www.postgresql.org/docs/current/logical-replication.html
ALTER SYSTEM SET wal_level = logical;
ALTER ROLE postgresuser WITH REPLICATION;
create table tys (k varchar, num numeric);
alter table tys replica identity full;
insert into tys values ('x00', 'nan');
insert into tys values ('x01', 'infinity');
insert into tys values ('x02', '-inf');
@xiangjinwu
Copy link
Author

Looks like Debezium recognizes NaN / Infinity / -Infinity from PostgreSQL:
https://github.com/debezium/debezium/blob/v2.6.1.Final/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java#L115-L128

But outputs the specialValue.name() from Java enum
https://github.com/debezium/debezium/blob/v2.6.1.Final/debezium-core/src/main/java/io/debezium/data/SpecialValueDecimal.java#L107

The use of all-capital NAN (but no mention of infinities) is also acknowledged in doc:
https://debezium.io/documentation/reference/2.6/connectors/postgresql.html#postgresql-decimal-types

PostgreSQL supports NaN (not a number) as a special value to be stored in DECIMAL/NUMERIC values when the setting of decimal.handling.mode is string or double. In this case, the connector encodes NaN as either Double.NaN or the string constant NAN.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment