Skip to content

Instantly share code, notes, and snippets.

@rmoff

rmoff/0_update.adoc

Last active Feb 26, 2021
Embed
What would you like to do?
Example of Using Kafka Single Message Transform TimestampConverter
"_comment": "Use SMT to cast op_ts and current_ts to timestamp datatype (TimestampConverter is Kafka >=0.11 / Confluent Platform >=3.3). Format from https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html",
"transforms": "convert_op_ts,convert_current_ts",
"transforms.convert_op_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_ts.target.type": "Timestamp",
"transforms.convert_op_ts.field": "current_ts",
"transforms.convert_op_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"transforms.convert_current_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_current_ts.target.type": "Timestamp",
"transforms.convert_current_ts.field": "op_ts",
"transforms.convert_current_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS"
@ShivaKumarPeddi

This comment has been minimized.

Copy link

@ShivaKumarPeddi ShivaKumarPeddi commented Nov 19, 2018

Does this only work when we have "value.converter": "io.confluent.connect.avro.AvroConverter" and "value.converter.schemas.enable": "false",

@lhedjazi

This comment has been minimized.

Copy link

@lhedjazi lhedjazi commented Apr 17, 2019

Very helpful indeed!!

I am struggling with one thing concerning my elasticsearch connector. The "transforms.convert_op_ts.target.type" actually work with "string" and "unix" but not with "Date" and "Timestamp" types. Here is my sink config:

'{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"schema.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"topics": "test-elasticsearch-sink",
"key.ignore": "true",
"transforms": "InsertMessageTime,ConvertTimeValue",
"transforms.InsertMessageTime.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertMessageTime.timestamp.field":"creationDate",
"transforms.ConvertTimeValue.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ConvertTimeValue.target.type":"Date",
"transforms.ConvertTimeValue.field":"creationDate",
"transforms.ConvertTimeValue.format":"yyyy-MM-dd HH:mm:ss.SSSSSS"
}
}'

I am looking to integrate it with Grafana and as you know the date field should be of "Date" type. I am getting the following error:
Caused by: org.apache.kafka.connect.errors.DataException: Java class class java.util.Date does not have corresponding schema type.\n\tat org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:604)
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:668)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:574)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:324)
at io.confluent.connect.elasticsearch.DataConverter.getPayload(DataConverter.java:181)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:163)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:283)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:268)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)\n\t... 10 more\n"}]

Any idea on how to addrress this issue?

Thanks for your help

@mattferroni

This comment has been minimized.

Copy link

@mattferroni mattferroni commented Jul 4, 2019

Hey @rmoff, I'm facing the same problem of @lhedjazi... any suggestion on how to face this? :)
I'm using:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
schema.ignore=true

@Amine27

This comment has been minimized.

Copy link

@Amine27 Amine27 commented Dec 24, 2019

Working great with JdbcSinkConnector.
Thank you.

@elabbassiwidad

This comment has been minimized.

Copy link

@elabbassiwidad elabbassiwidad commented Jun 14, 2020

Hello, Im generating data using avro schema, and this is the field i'm using to generate timestamps :
{
"name": "Timestamp",
"type": "long",
"logicalType": "timestamp-millis"
}

but in Kafka i get values as this/Unix epochs : 6028130266367963000 and I tried to use TimestampConverter to convert it to a readable format using this configuration :
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSSSS"
"transforms.TimestampConverter.target.type": "string"
"transforms.TimestampConverter.field ": "Timestamp"

but then I get this as a result : 3414461-02-18 00:36:47.000234 . Can Anyone please advise on what to do ?

@OpenCoderX

This comment has been minimized.

Copy link

@OpenCoderX OpenCoderX commented Jun 27, 2020

I can only achieve 3 digits of subsecond precision using this method. "yyyy-MM-dd HH:mm:ss.SSSSSS" is truncated to "yyyy-MM-dd HH:mm:ss.SSS"

@sachdevap

This comment has been minimized.

Copy link

@sachdevap sachdevap commented Jul 2, 2020

Hello, Im generating data using avro schema, and this is the field i'm using to generate timestamps :
{ "name": "Timestamp", "type": "long", "logicalType": "timestamp-millis" }
but in Kafka i get values as this/Unix epochs : 6028130266367963000 and I tried to use TimestampConverter to convert it to a readable format using this configuration :
"transforms": "TimestampConverter", "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSSSS" "transforms.TimestampConverter.target.type": "string" "transforms.TimestampConverter.field ": "Timestamp"

but then I get this as a result : 3414461-02-18 00:36:47.000234 . Can Anyone please advise on what to do ?

Were you able to correct this? I am hitting a wall this time.

@ashot-t

This comment has been minimized.

Copy link

@ashot-t ashot-t commented Jul 2, 2020

The issue related to micro/nano seconds which doesn't exist in SimpleDateFormat class .
The simplest solution could be dividing the value into 1000 or 1000000 before connector, depending either U have microseconds or nanoseconds.

@eldontc

This comment has been minimized.

Copy link

@eldontc eldontc commented Sep 29, 2020

I converted a timestamp with success, but the time zone of date isn't correct. The time zone is always UTC, even my server isn't in this time zone. My server is setup for America/Sao_Paulo and the date created by Kafka Connect Transform is in UTC timezone.
"transforms.ConvertTimeValue.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.ConvertTimeValue.target.type":"string", "transforms.ConvertTimeValue.field":"dataHoraChegadaElasticUTC", "transforms.ConvertTimeValue.format":"yyyy/MM/dd HH:mm:ss",

@ashot-t

This comment has been minimized.

Copy link

@ashot-t ashot-t commented Sep 30, 2020

U can use the "db.timezone" parameter in the connector configuration, just set your timezone and U will have the correct date in your target database.

@eldontc

This comment has been minimized.

Copy link

@eldontc eldontc commented Oct 1, 2020

U can use the "db.timezone" parameter in the connector configuration, just set your timezone and U will have the correct date in your target database.

I'm using a sink elasticsearch connector. I could'n find this parameter "db.timezone" in the docs. Actually, I thing this parameter db.timezone is exclusive for JDBC connectors.

@ashot-t

This comment has been minimized.

Copy link

@ashot-t ashot-t commented Oct 1, 2020

Sorry I didn't notice the question is about elasticsearch sink connector.

@4whomtbts

This comment has been minimized.

Copy link

@4whomtbts 4whomtbts commented Jan 20, 2021

It works like charm! Thank you a lot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment