Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active August 11, 2023 02:58
Show Gist options
  • Save rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9 to your computer and use it in GitHub Desktop.
Save rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9 to your computer and use it in GitHub Desktop.
Kafka Connect JDBC connector - numeric.mapping

Testing numeric.mapping in MS SQL Server 2017

c.f. https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html#mapping-column-types

  • Microsoft SQL Server 2017 (RTM-CU13) (KB4466404) - 14.0.3048.4 (X64) - Developer Edition (64-bit) on Linux (Ubuntu 16.04.5 LTS)
  • Confluent Platform 5.1

Summary

col1 col2 col3 col4
MSSQL column definition DECIMAL(5,2) NUMERIC(5,2) DECIMAL(5) DECIMAL
MSSQL created column decimal
length 5
precision 5
scale 2
numeric
length 5
precision 5
scale 2
decimal
length 5
precision 5
scale 0
decimal
length 9
precision 18
scale 0
Source data in MSSQL 100.01 -100.02 100 100
numeric.mapping = none (same as leaving it unset) Bytes
'\u0011
Bytes
Øî
Bytes
d
Bytes
d
numeric.mapping = best_fit Bytes
'\u0011
Double
-100.02
Bytes
d
Bytes
d
numeric.mapping = best_fit
(query used to CAST all DECIMAL fields to NUMERIC)
Double
100.01
Double
-100.02
Int
100
Int
100
numeric.mapping = precision_only Bytes
'\u0011
Bytes
Øî
Int
100
Int
100

Since MS SQL accepts both DECIMAL and NUMERIC as data types, use NUMERIC for Kafka Connect to correctly ingest the values when using numeric.precision=best_fit. If changing the source schema isn't an option then you can use query mode, demonstrated below.

Create source table in MS SQL

CREATE TABLE demo.NUM_TEST (
	TXN_ID INT,
	CUSTOMER_ID INT,
	AMOUNT_01 DECIMAL(5,2),
	AMOUNT_02 NUMERIC(5,2), 
	AMOUNT_03 DECIMAL(5),
	AMOUNT_04 DECIMAL
);

INSERT INTO demo..NUM_TEST VALUES (42,42,100.01, -100.02, 100, 100);

Default behaviour (same as none)

numeric.mapping is left unset.

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_mssql_01",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
                      "connection.user": "connect_user",
                      "connection.password": "Asgard123",
                      "topic.prefix": "mssql-01-",
                      "table.whitelist" : "demo.dbo.NUM_TEST",
                      "mode":"bulk",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data - all NUMERIC fields are bytes

    ksql> PRINT 'mssql-01-NUM_TEST' FROM BEGINNING;
    Format:AVRO
    1/8/19 1:06:17 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}}
    
  • Schema

    $ curl -s "http://localhost:8081/subjects/mssql-01-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
    {
      "name": "AMOUNT_01",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "AMOUNT_02",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "AMOUNT_03",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "AMOUNT_04",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    

best_fit

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_mssql_02",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
                      "connection.user": "connect_user",
                      "connection.password": "Asgard123",
                      "topic.prefix": "mssql-02-",
                      "table.whitelist" : "demo.dbo.NUM_TEST",
                      "mode":"bulk",
                      "numeric.mapping": "best_fit",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data - NUMERIC field is DOUBLE (FLOAT), but all DECIMAL fields remain as bytes

    ksql> PRINT 'mssql-02-NUM_TEST' FROM BEGINNING;
    Format:AVRO
    1/8/19 2:07:06 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": -100.02, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}}
    
  • Schema

    $ curl -s "http://localhost:8081/subjects/mssql-02-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
    {
      "name": "AMOUNT_01",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "AMOUNT_02",
      "type": [
        "null",
        "double"
      ],
      "default": null
    }
    {
      "name": "AMOUNT_03",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "AMOUNT_04",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    

precision_only

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_mssql_03",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
                      "connection.user": "connect_user",
                      "connection.password": "Asgard123",
                      "topic.prefix": "mssql-03-",
                      "table.whitelist" : "demo.dbo.NUM_TEST",
                      "mode":"bulk",
                      "numeric.mapping": "precision_only",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data - all fields are still bytes. This is because the only zero-scale field is a DECIMAL not NUMERIC. If there was a NUMERIC field with zero scale then we would expect to see it handled correctly here as cast to INT.

    ksql> PRINT 'mssql-03-NUM_TEST' FROM BEGINNING;
    Format:AVRO
    1/8/19 2:09:16 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}}
    
  • Schema

    $ curl -s "http://localhost:8081/subjects/mssql-03-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
    {
      "name": "AMOUNT_01",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "AMOUNT_02",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "AMOUNT_03",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "AMOUNT_04",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    

Casting DECIMAL fields to NUMERIC using query mode

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_mssql_04",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:sqlserver://mssql:1433;databaseName=demo",
                      "connection.user": "connect_user",
                      "connection.password": "Asgard123",
                      "topic.prefix": "mssql-04-NUM_TEST",
                      "query" : "SELECT TXN_ID ,CUSTOMER_ID ,CAST(AMOUNT_01 AS NUMERIC(5,2)) AS AMOUNT_01 ,AMOUNT_02 ,CAST(AMOUNT_03 AS NUMERIC(5)) AS AMOUNT_03 ,CAST(AMOUNT_04 AS NUMERIC) AS AMOUNT_04 FROM NUM_TEST",
                      "mode":"bulk",
                      "numeric.mapping": "best_fit",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data - NUMERIC fields are DOUBLE (FLOAT) and INT as appropriate

    ksql> PRINT 'mssql-04-NUM_TEST' FROM BEGINNING;
    Format:AVRO
    1/8/19 2:18:30 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": 100.01, "AMOUNT_02": -100.02, "AMOUNT_03": 100, "AMOUNT_04": 100}
    
  • Schema

    $ curl -s "http://localhost:8081/subjects/mssql-04-NUM_TEST-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
    {
      "name": "AMOUNT_01",
      "type": [
        "null",
        "double"
      ],
      "default": null
    }
    {
      "name": "AMOUNT_02",
      "type": [
        "null",
        "double"
      ],
      "default": null
    }
    {
      "name": "AMOUNT_03",
      "type": [
        "null",
        "int"
      ],
      "default": null
    }
    {
      "name": "AMOUNT_04",
      "type": [
        "null",
        "long"
      ],
      "default": null
    }
    

Testing numeric.mapping in Oracle

c.f. https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html#mapping-column-types

  • Oracle Database 12c Enterprise Edition Release 12.2.0.1.0 - 64bit Production
  • Confluent Platform 5.1

Summary

int_col_A int_col_A col1 col2 col3 col4 col5 col6 col7
Oracle DDL INT INT DECIMAL(5,2) NUMERIC(5,2) DECIMAL(5) DECIMAL NUMBER(5,2) NUMBER(5) NUMBER
Oracle created column NUMBER(38) NUMBER(38) NUMBER(5,2) NUMBER(5,2) NUMBER(5) NUMBER(38) NUMBER(5,2) NUMBER(5) NUMBER
Source data INSERTed to Oracle 42 42 100.01 100.02 100.03 100.04 100.05 100.06 100.07
Data SELECTed from Oracle 42 42 100.01 -100.02 100 100 100.05 100 100.07
numeric.mapping = none (same as leaving it unset) Bytes
*
Bytes
*
Bytes
'\u0011
Bytes
Øî
Bytes
d
Bytes
d
Bytes
'\u0015
Bytes
d
Bytes
\u0017\u0018¦ÚÿôbÙJ31\u008D6ë-vA\u0099Üåù_'c\u0005ÔÒÒÑCìæÊ£\nð÷)À\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000
numeric.mapping = best_fit Bytes
*
Bytes
*
Double
100.01
Double
-100.02
Int
100
Bytes
d
Double
100.05
Int
100
Bytes
\u0017\u0018¦ÚÿôbÙJ31\u008D6ë-vA\u0099Üåù_'c\u0005ÔÒÒÑCìæÊ£\nð÷)À\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000
numeric.mapping = precision Bytes
*
Bytes
*
Bytes
'\u0011
Bytes
Øî
100 Bytes
d
Bytes
'\u0015
Int
100
Bytes
\u0017\u0018¦ÚÿôbÙJ31\u008D6ë-vA\u0099Üåù_'c\u0005ÔÒÒÑCìæÊ£\nð÷)À\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000
numeric.mapping = best_fit
(query used to CAST all fields to NUMERIC with precision and scale)
Int
42
Int
42
Double
100.01
Double
-100.02
Int
100
Double
100.0
Double
100.05
Double
100.0
Double
100.07
  • For numeric.mapping to work, the NUMBER should have a declared precision. If it doesn't then Oracle creates it with a precision of 38 (NUMBER(38)) which Connect cannot store in a type other than the bytes/BigDecimal.
  • Therefore, don't create columns as NUMBER, but create them as NUMBER(9,2) (or however big it needs to be to store the values).
  • If modifying the schema isn't an option you can use the Kafka Connect JDBC source connector query option to cast the source data to appropriate data types.
  • N.B. Oracle treats DECIMAL, NUMERIC, and INT as NUMBER fields.
  • Ref: Oracle NUMBER data type

Create source table in Oracle

CREATE TABLE NUM_TEST (
	TXN_ID INT,
	CUSTOMER_ID INT,
	AMOUNT_01 DECIMAL(5,2),
	AMOUNT_02 NUMERIC(5,2), 
	AMOUNT_03 DECIMAL(5),
	AMOUNT_04 DECIMAL,
	AMOUNT_05 NUMBER(5,2), 
	AMOUNT_06 NUMBER(5), 
	AMOUNT_07 NUMBER
);

INSERT INTO NUM_TEST VALUES (42,42,100.01, -100.02, 100.03, 100.04, 100.05, 100.06, 100.07);

Results

Default behaviour (same as none)

numeric.mapping is left unset.

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_oracle_01",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
                      "connection.user": "connect_user",
                      "connection.password": "asgard",
                      "topic.prefix": "oracle-01-",
                      "table.whitelist" : "NUM_TEST",
                      "mode":"bulk",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data - all fields are written as bytes (including those specified as INT)

    ksql> PRINT 'oracle-01-NUM_TEST' FROM BEGINNING;
    Format:AVRO
    1/9/19 12:04:34 PM UTC, null, {"TXN_ID": {"bytes": "*"}, "CUSTOMER_ID": {"bytes": "*"}, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": {"bytes": "d"}, "AMOUNT_04": {"bytes": "d"}, "AMOUNT_05": {"bytes": "'\u0015"}, "AMOUNT_06": {"bytes": "d"}, "AMOUNT_07": {"bytes": "\u0017\u0019> ëA³§­f)Íî¥Ã¬\u0088z ±³ºÈ0µ)ÜÒ\u0095)\u008DííY\u0094u¹\u0004`\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}}
    

best_fit

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_oracle_02",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
                      "connection.user": "connect_user",
                      "connection.password": "asgard",
                      "topic.prefix": "oracle-02-",
                      "table.whitelist" : "NUM_TEST",
                      "mode":"bulk",
                      "numeric.mapping": "best_fit",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data

    ksql> PRINT 'oracle-02-NUM_TEST' FROM BEGINNING;
    Format:AVRO
    1/9/19 12:04:52 PM UTC, null, {"TXN_ID": {"bytes": "*"}, "CUSTOMER_ID": {"bytes": "*"}, "AMOUNT_01": 100.01, "AMOUNT_02": -100.02, "AMOUNT_03": 100, "AMOUNT_04": {"bytes": "d"}, "AMOUNT_05": 100.05, "AMOUNT_06": 100, "AMOUNT_07": {"bytes": "\u0017\u0019> ëA³§­f)Íî¥Ã¬\u0088z ±³ºÈ0µ)ÜÒ\u0095)\u008DííY\u0094u¹\u0004`\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}}
    

precision_only

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_oracle_03",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
                      "connection.user": "connect_user",
                      "connection.password": "asgard",
                      "topic.prefix": "oracle-03-",
                      "table.whitelist" : "NUM_TEST",
                      "mode":"bulk",
                      "numeric.mapping": "precision_only",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data

    ksql> PRINT 'oracle-03-NUM_TEST' FROM BEGINNING;
    Format:AVRO
    1/9/19 12:05:01 PM UTC, null, {"TXN_ID": {"bytes": "*"}, "CUSTOMER_ID": {"bytes": "*"}, "AMOUNT_01": {"bytes": "'\u0011"}, "AMOUNT_02": {"bytes": "Øî"}, "AMOUNT_03": 100, "AMOUNT_04": {"bytes": "d"}, "AMOUNT_05": {"bytes": "'\u0015"}, "AMOUNT_06": 100, "AMOUNT_07": {"bytes": "\u0017\u0019> ëA³§­f)Íî¥Ã¬\u0088z ±³ºÈ0µ)ÜÒ\u0095)\u008DííY\u0094u¹\u0004`\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}}
    

Casting fields to NUMBER with appropriate precision & scale using query mode

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_oracle_04",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
                      "connection.user": "connect_user",
                      "connection.password": "asgard",
                      "topic.prefix": "oracle-04-NUM_TEST",
                      "mode":"bulk",
                      "numeric.mapping": "best_fit",
                      "query" : "SELECT CAST(TXN_ID AS NUMBER(5,0)) AS TXN_ID,CAST(CUSTOMER_ID AS NUMBER(5,0)) AS CUSTOMER_ID, AMOUNT_01 ,AMOUNT_02 ,AMOUNT_03 ,CAST(AMOUNT_04 AS NUMBER(5,2)) AS AMOUNT_04 ,AMOUNT_05, CAST(AMOUNT_06 AS NUMBER(5,2)) AS AMOUNT_06, CAST(AMOUNT_07 AS NUMBER(5,2)) AS AMOUNT_07 FROM NUM_TEST",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data - all fields are DOUBLE (FLOAT) and INT as appropriate

    ksql> PRINT 'oracle-04-NUM_TEST' FROM BEGINNING;
    Format:AVRO
    1/9/19 12:12:19 PM UTC, null, {"TXN_ID": 42, "CUSTOMER_ID": 42, "AMOUNT_01": 100.01, "AMOUNT_02": -100.02, "AMOUNT_03": 100, "AMOUNT_04": 100.0, "AMOUNT_05": 100.05, "AMOUNT_06": 100.0, "AMOUNT_07": 100.07}
    

Schemas

Default behaviour (same as none)

$ curl -s "http://localhost:8081/subjects/oracle-01-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
  "name": "AMOUNT_01",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_02",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_03",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 0,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "0"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_04",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 0,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "0"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_05",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_06",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 0,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "0"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_07",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 127,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "127"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}

best_fit

$ curl -s "http://localhost:8081/subjects/oracle-02-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
  "name": "AMOUNT_01",
  "type": [
    "null",
    "double"
  ],
  "default": null
}
{
  "name": "AMOUNT_02",
  "type": [
    "null",
    "double"
  ],
  "default": null
}
{
  "name": "AMOUNT_03",
  "type": [
    "null",
    "int"
  ],
  "default": null
}
{
  "name": "AMOUNT_04",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 0,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "0"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_05",
  "type": [
    "null",
    "double"
  ],
  "default": null
}
{
  "name": "AMOUNT_06",
  "type": [
    "null",
    "int"
  ],
  "default": null
}
{
  "name": "AMOUNT_07",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 127,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "127"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}

precision_only

$ curl -s "http://localhost:8081/subjects/oracle-03-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
  "name": "AMOUNT_01",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_02",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_03",
  "type": [
    "null",
    "int"
  ],
  "default": null
}
{
  "name": "AMOUNT_04",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 0,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "0"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_05",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}
{
  "name": "AMOUNT_06",
  "type": [
    "null",
    "int"
  ],
  "default": null
}
{
  "name": "AMOUNT_07",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 127,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "127"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}

Casting fields to NUMBER with appropriate precision & scale using query mode

$ curl -s "http://localhost:8081/subjects/oracle-04-NUM_TEST-value/versions/latest"|jq '.schema|fromjson.fields[] | select (.name | contains("AMOUNT"))'
{
  "name": "AMOUNT_01",
  "type": [
    "null",
    "double"
  ],
  "default": null
}
{
  "name": "AMOUNT_02",
  "type": [
    "null",
    "double"
  ],
  "default": null
}
{
  "name": "AMOUNT_03",
  "type": [
    "null",
    "int"
  ],
  "default": null
}
{
  "name": "AMOUNT_04",
  "type": [
    "null",
    "double"
  ],
  "default": null
}
{
  "name": "AMOUNT_05",
  "type": [
    "null",
    "double"
  ],
  "default": null
}
{
  "name": "AMOUNT_06",
  "type": [
    "null",
    "double"
  ],
  "default": null
}
{
  "name": "AMOUNT_07",
  "type": [
    "null",
    "double"
  ],
  "default": null
}

Testing numeric.mapping in Postgres

c.f. https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html#mapping-column-types

  • Postgres 11.1
  • Confluent Platform 5.1

Summary

col1 col2 col3 col4
Postgres column definition DECIMAL(5,2) NUMERIC(5,2) DECIMAL(5) DECIMAL
Source data in Postgres 100.01 -100.02 100 100
numeric.mapping = none (same as leaving it unset) Bytes
'\u0011
Bytes
Øî
Bytes
d
Bytes
d
numeric.mapping = best_fit Double
100.01
Double
-100.02
Int
100
Int
100
numeric.mapping = precision_only Bytes
'\u0011
Bytes
Øî
Int
100
Int
100

Create source table in Postgres

CREATE TABLE demo.NUM_TEST (
	TXN_ID INT,
	CUSTOMER_ID INT,
	AMOUNT_01 DECIMAL(5,2),
	AMOUNT_02 NUMERIC(5,2), 
	AMOUNT_03 DECIMAL(5),
	AMOUNT_04 DECIMAL
);

-- The precision represents the number of significant digits that are stored for values
-- the scale represents the number of digits that can be stored following the decimal point.

INSERT INTO demo.NUM_TEST VALUES (42,42,100.01, -100.02, 100, 100);

-- postgres=# SELECT * FROM demo.NUM_TEST;
--  txn_id | customer_id | amount_01 | amount_02 | amount_03 | amount_04
-- --------+-------------+-----------+-----------+-----------+-----------
--      42 |          42 |    100.01 |   -100.02 |       100 |    100.04
--      42 |          42 |    200.01 |   -200.02 |       200 |       200
-- (1 row)

-- postgres=# \d demo.NUM_TEST;
--                     Table "demo.num_test"
--    Column    |     Type     | Collation | Nullable | Default
-- -------------+--------------+-----------+----------+---------
--  txn_id      | integer      |           |          |
--  customer_id | integer      |           |          |
--  amount_01   | numeric(5,2) |           |          |
--  amount_02   | numeric(5,2) |           |          |
--  amount_03   | numeric(5,0) |           |          |
--  amount_04   | numeric      |           |          |

Default behaviour

numeric.mapping is left unset.

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_postgres_15",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                      "connection.user": "connect_user",
                      "connection.password": "asgard",
                      "topic.prefix": "postgres-15-",
                      "table.whitelist" : "demo.num_test",
                      "mode":"bulk",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data - all NUMERIC fields are bytes

    ksql> print 'postgres-15-num_test' from beginning;
    Format:AVRO
    1/7/19 4:25:13 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": {"bytes": "'\u0011"}, "amount_02": {"bytes": "Øî"}, "amount_03": {"bytes": "d"}, "amount_04": {"bytes": "d"}}
    
  • Schema

    $ curl -s "http://localhost:8081/subjects/postgres-15-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
    {
      "name": "amount_01",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "amount_02",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "amount_03",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "amount_04",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    

best_fit

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_postgres_12",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                      "connection.user": "connect_user",
                      "connection.password": "asgard",
                      "topic.prefix": "postgres-12-",
                      "numeric.mapping": "best_fit",
                      "table.whitelist" : "demo.num_test",
                      "mode":"bulk",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data - NUMERIC fields are DOUBLE (FLOAT) and INT as appropriate

    ksql> print 'postgres-12-num_test' from beginning;
    Format:AVRO
    1/7/19 4:27:08 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": 100.01, "amount_02": -100.02, "amount_03": 100, "amount_04": 100}
    
  • Schema

    $ curl -s "http://localhost:8081/subjects/postgres-12-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
    {
      "name": "amount_01",
      "type": [
        "null",
        "double"
      ],
      "default": null
    }
    {
      "name": "amount_02",
      "type": [
        "null",
        "double"
      ],
      "default": null
    }
    {
      "name": "amount_03",
      "type": [
        "null",
        "int"
      ],
      "default": null
    }
    {
      "name": "amount_04",
      "type": [
        "null",
        {
          "type": "int",
          "connect.type": "int8"
        }
      ],
      "default": null
    }
    

none

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_postgres_13",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                      "connection.user": "connect_user",
                      "connection.password": "asgard",
                      "topic.prefix": "postgres-13-",
                      "numeric.mapping": "none",
                      "table.whitelist" : "demo.num_test",
                      "mode":"bulk",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data - all NUMERIC fields are bytes

    ksql> print 'postgres-13-num_test' from beginning;
    Format:AVRO
    1/7/19 4:27:52 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": {"bytes": "'\u0011"}, "amount_02": {"bytes": "Øî"}, "amount_03": {"bytes": "d"}, "amount_04": {"bytes": "d"}}
    
  • Schema

    $ curl -s "http://localhost:8081/subjects/postgres-13-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
    {
      "name": "amount_01",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "amount_02",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "amount_03",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "amount_04",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    

precision_only

  • Create connector

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_postgres_14",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                      "connection.user": "connect_user",
                      "connection.password": "asgard",
                      "topic.prefix": "postgres-14-",
                      "numeric.mapping": "precision_only",
                      "table.whitelist" : "demo.num_test",
                      "mode":"bulk",
                      "poll.interval.ms" : 3600000
                      }
              }'
    
  • Output data - zero scale NUMERIC fields are INT, others are still bytes

    ksql> print 'postgres-14-num_test' from beginning;
    Format:AVRO
    1/7/19 4:05:57 PM UTC, null, {"txn_id": 42, "customer_id": 42, "amount_01": {"bytes": "'\u0011"}, "amount_02": {"bytes": "Øî"}, "amount_03": 100, "amount_04": 100}
    
  • Schema

    $ curl -s "http://localhost:8081/subjects/postgres-14-num_test-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name | contains("amount"))'
    {
      "name": "amount_01",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "amount_02",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }
    {
      "name": "amount_03",
      "type": [
        "null",
        "int"
      ],
      "default": null
    }
    {
      "name": "amount_04",
      "type": [
        "null",
        {
          "type": "int",
          "connect.type": "int8"
        }
      ],
      "default": null
    }
    

Footnote

If you define a field as having zero scale (e.g. NUMERIC(5)) but insert a value with scale into it, the INSERT succeeds but will be ignored when querying:

  [2019-01-07 16:04:44,456] WARN Ignoring record due to SQL error: (io.confluent.connect.jdbc.source.BulkTableQuerier)
  org.postgresql.util.PSQLException: Bad value for type BigDecimal : 100.04
  at org.postgresql.jdbc2.AbstractJdbc2ResultSet.scaleBigDecimal(AbstractJdbc2ResultSet.java:3131)
  at org.postgresql.jdbc2.AbstractJdbc2ResultSet.getBigDecimal(AbstractJdbc2ResultSet.java:2469)
  at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$columnConverterFor$20(GenericDatabaseDialect.java:1177)
  at io.confluent.connect.jdbc.source.SchemaMapping$FieldSetter.setField(SchemaMapping.java:158)
  at io.confluent.connect.jdbc.source.BulkTableQuerier.extractRecord(BulkTableQuerier.java:76)
  at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:309)
  at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
  at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
@makampf
Copy link

makampf commented May 15, 2020

Thank you for your work. I want to add that for Oracle, it is important to know that the threshold for numeric.mapping=best_fit to recognize an Integer for e.g. Oracle column type NUMBER(p,s), it has to be p<19 to fit in primitive datatype (see also the code).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment