-
-
Save xiangjinwu/e879234981b65b08a074f27f6bab54ff to your computer and use it in GitHub Desktop.
Debezium PostgreSQL `decimal.handling.mode=string`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Modified from: https://github.com/risingwavelabs/risingwave/tree/main/integration_tests/debezium-postgres | |
# https://github.com/risingwavelabs/risingwave/blob/main/integration_tests/scripts/run_demos.py | |
docker compose up -d --build | |
docker compose exec postgres bash -c "psql postgresql://postgresuser:postgrespw@postgres:5432/mydb < postgres_prepare.sql" | |
curl -S -X PUT -H "Content-Type: application/json" http://localhost:8083/connectors/pg-default/config \ | |
-d '{ | |
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", | |
"topic.prefix": "pg", | |
"database.hostname": "postgres", | |
"database.port": 5432, | |
"database.user": "postgresuser", | |
"database.password": "postgrespw", | |
"database.dbname": "mydb", | |
"database.server.name": "postgres", | |
"database.schema": "public", | |
"database.history.kafka.bootstrap.servers": "message_queue:29092", | |
"database.history.kafka.topic": "postgres-history", | |
"time.precision.mode": "adaptive", | |
"decimal.handling.mode": "string", | |
"interval.handling.mode": "numeric", | |
"include.schema.changes": false, | |
"slot.name": "debezium_1", | |
"table.include.list": "public.tys" | |
}' | |
docker compose exec message_queue rpk topic consume pg.public.tys |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
version: "3" | |
services: | |
message_queue: | |
image: "docker.vectorized.io/vectorized/redpanda:latest" | |
command: | |
- redpanda | |
- start | |
- "--smp" | |
- "1" | |
- "--reserve-memory" | |
- 0M | |
- "--memory" | |
- 4G | |
- "--overprovisioned" | |
- "--node-id" | |
- "0" | |
- "--check=false" | |
- "--kafka-addr" | |
- "PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092" | |
- "--advertise-kafka-addr" | |
- "PLAINTEXT://message_queue:29092,OUTSIDE://localhost:9092" | |
expose: | |
- "29092" | |
- "9092" | |
- "9644" | |
ports: | |
- "29092:29092" | |
- "9092:9092" | |
- "9644:9644" | |
- "8081:8081" | |
depends_on: [] | |
volumes: | |
- "message_queue:/var/lib/redpanda/data" | |
environment: {} | |
container_name: message_queue | |
healthcheck: | |
test: curl -f localhost:9644/v1/status/ready | |
interval: 1s | |
timeout: 5s | |
retries: 5 | |
restart: always | |
postgres: | |
image: debezium/postgres:16-alpine | |
ports: | |
- 15432:5432 | |
environment: | |
POSTGRES_PASSWORD: postgrespw | |
POSTGRES_USER: postgresuser | |
POSTGRES_DB: mydb | |
healthcheck: | |
test: | |
[ | |
"CMD-SHELL", | |
"pg_isready -h 127.0.0.1 -U postgresuser -d mydb" | |
] | |
interval: 5s | |
timeout: 5s | |
retries: 5 | |
container_name: postgres | |
volumes: | |
- ./postgres_bootstrap.sql:/docker-entrypoint-initdb.d/postgres_bootstrap.sql | |
- "./postgres_prepare.sql:/postgres_prepare.sql" | |
debezium: | |
image: debezium-connect | |
build: . | |
environment: | |
BOOTSTRAP_SERVERS: message_queue:29092 | |
GROUP_ID: 1 | |
CONFIG_STORAGE_TOPIC: connect_configs | |
OFFSET_STORAGE_TOPIC: connect_offsets | |
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter | |
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter | |
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://message_queue:8081 | |
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://message_queue:8081 | |
ports: | |
- 8083:8083 | |
healthcheck: | |
test: curl -f localhost:8083 | |
interval: 1s | |
start_period: 120s | |
depends_on: | |
message_queue: { condition: service_healthy } | |
postgres: { condition: service_healthy } | |
container_name: debezium | |
# Check out the connectors via 127.0.0.1:8000 | |
# kafka-connect-ui: | |
# image: landoop/kafka-connect-ui | |
# platform: linux/amd64 | |
# ports: | |
# - 8000:8000 | |
# environment: | |
# CONNECT_URL: http://debezium:8083 | |
# container_name: kafka-connect-ui | |
# depends_on: | |
# message_queue: { condition: service_healthy } | |
volumes: | |
message_queue: | |
external: false | |
name: risingwave-compose |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM quay.io/debezium/connect:2.5.0.Final | |
ENV AVRO_PLUGION_DIR=$KAFKA_CONNECT_PLUGINS_DIR/confluent-avro-plugin | |
ARG CONFLUENT_PLUGIN_VERSION=5.3.0 | |
RUN mkdir $AVRO_PLUGION_DIR && cd $AVRO_PLUGION_DIR && \ | |
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/$CONFLUENT_PLUGIN_VERSION/kafka-connect-avro-converter-$CONFLUENT_PLUGIN_VERSION.jar && \ | |
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-data/$CONFLUENT_PLUGIN_VERSION/kafka-connect-avro-data-$CONFLUENT_PLUGIN_VERSION.jar && \ | |
curl -sO https://packages.confluent.io/maven/io/confluent/common-config/$CONFLUENT_PLUGIN_VERSION/common-config-$CONFLUENT_PLUGIN_VERSION.jar && \ | |
curl -sO https://packages.confluent.io/maven/io/confluent/common-utils/$CONFLUENT_PLUGIN_VERSION/common-utils-$CONFLUENT_PLUGIN_VERSION.jar && \ | |
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-schema-serializer/$CONFLUENT_PLUGIN_VERSION/kafka-schema-serializer-$CONFLUENT_PLUGIN_VERSION.jar && \ | |
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/$CONFLUENT_PLUGIN_VERSION/kafka-schema-registry-client-$CONFLUENT_PLUGIN_VERSION.jar && \ | |
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/$CONFLUENT_PLUGIN_VERSION/kafka-avro-serializer-$CONFLUENT_PLUGIN_VERSION.jar && \ | |
curl -sO https://repo1.maven.org/maven2/com/google/guava/guava/31.0.1-jre/guava-31.0.1-jre.jar && \ | |
curl -sO https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar && \ | |
curl -sO https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar && \ | |
curl -sO https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"topic": "pg.public.tys", | |
"value": "\u0000\u0000\u0000\u0000\u0001\u0000\u0002\u0002\u0006x00\u0002\u0006NAN\u00162.5.0.Final\u0014postgresql\u0004pg\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000\nfirst\u0008mydb\u0002\"[null,\"27581464\"]\u000cpublic\u0006tys\u0002\ufffd\u000b\u0002\ufffd\ufffd\ufffd\u001a\u0000\u0002r\u0002\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000", | |
"timestamp": 1713865801583, | |
"partition": 0, | |
"offset": 0 | |
} | |
{ | |
"topic": "pg.public.tys", | |
"value": "\u0000\u0000\u0000\u0000\u0001\u0000\u0002\u0002\u0006x01\u0002\"POSITIVE_INFINITY\u00162.5.0.Final\u0014postgresql\u0004pg\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000\u0008true\u0008mydb\u0002\"[null,\"27581464\"]\u000cpublic\u0006tys\u0002\ufffd\u000b\u0002\ufffd\ufffd\ufffd\u001a\u0000\u0002r\u0002\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000", | |
"timestamp": 1713865801589, | |
"partition": 0, | |
"offset": 1 | |
} | |
{ | |
"topic": "pg.public.tys", | |
"value": "\u0000\u0000\u0000\u0000\u0001\u0000\u0002\u0002\u0006x02\u0002\"NEGATIVE_INFINITY\u00162.5.0.Final\u0014postgresql\u0004pg\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000\u0008last\u0008mydb\u0002\"[null,\"27581464\"]\u000cpublic\u0006tys\u0002\ufffd\u000b\u0002\ufffd\ufffd\ufffd\u001a\u0000\u0002r\u0002\ufffd\ufffd\ufffd\ufffd\ufffdc\u0000", | |
"timestamp": 1713865801589, | |
"partition": 0, | |
"offset": 2 | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- https://www.postgresql.org/docs/current/logical-replication.html | |
ALTER SYSTEM SET wal_level = logical; | |
ALTER ROLE postgresuser WITH REPLICATION; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create table tys (k varchar, num numeric); | |
alter table tys replica identity full; | |
insert into tys values ('x00', 'nan'); | |
insert into tys values ('x01', 'infinity'); | |
insert into tys values ('x02', '-inf'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Looks like Debezium recognizes
NaN
/Infinity
/-Infinity
from PostgreSQL:https://github.com/debezium/debezium/blob/v2.6.1.Final/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java#L115-L128
But outputs the
specialValue.name()
from Java enumhttps://github.com/debezium/debezium/blob/v2.6.1.Final/debezium-core/src/main/java/io/debezium/data/SpecialValueDecimal.java#L107
The use of all-capital
NAN
(but no mention of infinities) is also acknowledged in doc:https://debezium.io/documentation/reference/2.6/connectors/postgresql.html#postgresql-decimal-types