Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active February 26, 2021 12:03
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rmoff/179ed4067b9f042344cf597286ac1840 to your computer and use it in GitHub Desktop.
Save rmoff/179ed4067b9f042344cf597286ac1840 to your computer and use it in GitHub Desktop.
Example of Using Kafka Single Message Transform TimestampConverter
"_comment": "Use SMT to cast op_ts and current_ts to timestamp datatype (TimestampConverter is Kafka >=0.11 / Confluent Platform >=3.3). Format from https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html",
"transforms": "convert_op_ts,convert_current_ts",
"transforms.convert_op_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_ts.target.type": "Timestamp",
"transforms.convert_op_ts.field": "current_ts",
"transforms.convert_op_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"transforms.convert_current_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_current_ts.target.type": "Timestamp",
"transforms.convert_current_ts.field": "op_ts",
"transforms.convert_current_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS"
@ShivaKumarPeddi
Copy link

Does this only work when we have "value.converter": "io.confluent.connect.avro.AvroConverter" and "value.converter.schemas.enable": "false",

@lhedjazi
Copy link

lhedjazi commented Apr 17, 2019

Very helpful indeed!!

I am struggling with one thing concerning my elasticsearch connector. The "transforms.convert_op_ts.target.type" actually work with "string" and "unix" but not with "Date" and "Timestamp" types. Here is my sink config:

'{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"schema.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"topics": "test-elasticsearch-sink",
"key.ignore": "true",
"transforms": "InsertMessageTime,ConvertTimeValue",
"transforms.InsertMessageTime.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertMessageTime.timestamp.field":"creationDate",
"transforms.ConvertTimeValue.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ConvertTimeValue.target.type":"Date",
"transforms.ConvertTimeValue.field":"creationDate",
"transforms.ConvertTimeValue.format":"yyyy-MM-dd HH:mm:ss.SSSSSS"
}
}'

I am looking to integrate it with Grafana and as you know the date field should be of "Date" type. I am getting the following error:
Caused by: org.apache.kafka.connect.errors.DataException: Java class class java.util.Date does not have corresponding schema type.\n\tat org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:604)
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:668)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:574)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:324)
at io.confluent.connect.elasticsearch.DataConverter.getPayload(DataConverter.java:181)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:163)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:283)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:268)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)\n\t... 10 more\n"}]

Any idea on how to addrress this issue?

Thanks for your help

@mattferroni
Copy link

Hey @rmoff, I'm facing the same problem of @lhedjazi... any suggestion on how to face this? :)
I'm using:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
schema.ignore=true

@Amine27
Copy link

Amine27 commented Dec 24, 2019

Working great with JdbcSinkConnector.
Thank you.

@elabbassiwidad
Copy link

Hello, Im generating data using avro schema, and this is the field i'm using to generate timestamps :
{
"name": "Timestamp",
"type": "long",
"logicalType": "timestamp-millis"
}

but in Kafka i get values as this/Unix epochs : 6028130266367963000 and I tried to use TimestampConverter to convert it to a readable format using this configuration :
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSSSS"
"transforms.TimestampConverter.target.type": "string"
"transforms.TimestampConverter.field ": "Timestamp"

but then I get this as a result : 3414461-02-18 00:36:47.000234 . Can Anyone please advise on what to do ?

@OpenCoderX
Copy link

I can only achieve 3 digits of subsecond precision using this method. "yyyy-MM-dd HH:mm:ss.SSSSSS" is truncated to "yyyy-MM-dd HH:mm:ss.SSS"

@sachdevap
Copy link

Hello, Im generating data using avro schema, and this is the field i'm using to generate timestamps :
{ "name": "Timestamp", "type": "long", "logicalType": "timestamp-millis" }
but in Kafka i get values as this/Unix epochs : 6028130266367963000 and I tried to use TimestampConverter to convert it to a readable format using this configuration :
"transforms": "TimestampConverter", "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSSSS" "transforms.TimestampConverter.target.type": "string" "transforms.TimestampConverter.field ": "Timestamp"

but then I get this as a result : 3414461-02-18 00:36:47.000234 . Can Anyone please advise on what to do ?

Were you able to correct this? I am hitting a wall this time.

@ashot-t
Copy link

ashot-t commented Jul 2, 2020

The issue related to micro/nano seconds which doesn't exist in SimpleDateFormat class .
The simplest solution could be dividing the value into 1000 or 1000000 before connector, depending either U have microseconds or nanoseconds.

@eldontc
Copy link

eldontc commented Sep 29, 2020

I converted a timestamp with success, but the time zone of date isn't correct. The time zone is always UTC, even my server isn't in this time zone. My server is setup for America/Sao_Paulo and the date created by Kafka Connect Transform is in UTC timezone.
"transforms.ConvertTimeValue.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.ConvertTimeValue.target.type":"string", "transforms.ConvertTimeValue.field":"dataHoraChegadaElasticUTC", "transforms.ConvertTimeValue.format":"yyyy/MM/dd HH:mm:ss",

@ashot-t
Copy link

ashot-t commented Sep 30, 2020

U can use the "db.timezone" parameter in the connector configuration, just set your timezone and U will have the correct date in your target database.

@eldontc
Copy link

eldontc commented Oct 1, 2020

U can use the "db.timezone" parameter in the connector configuration, just set your timezone and U will have the correct date in your target database.

I'm using a sink elasticsearch connector. I could'n find this parameter "db.timezone" in the docs. Actually, I thing this parameter db.timezone is exclusive for JDBC connectors.

@ashot-t
Copy link

ashot-t commented Oct 1, 2020

Sorry I didn't notice the question is about elasticsearch sink connector.

@4whomtbts
Copy link

It works like charm! Thank you a lot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment