/Single Message Transform to add topic, partition, offset to sink message
Created Jul 15, 2019
curl -i -X PUT -H "Content-Type:application/json" \ | |
http://localhost:8083/connectors/sink-elastic-orders-00/config \ | |
-d '{ | |
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", | |
"topics": "orders", | |
"connection.url": "http://elasticsearch:9200", | |
"type.name": "type.name=kafkaconnect", | |
"key.ignore": "true", | |
"schema.ignore": "false", | |
"errors.tolerance":"all", | |
"transforms": "addTS,InsertTopic,InsertOffset,InsertPartition", | |
"transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value", | |
"transforms.addTS.timestamp.field": "op_ts", | |
"transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value", | |
"transforms.InsertTopic.topic.field":"source_topic", | |
"transforms.InsertOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value", | |
"transforms.InsertOffset.offset.field":"source_offset", | |
"transforms.InsertPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value", | |
"transforms.InsertPartition.partition.field":"source_partition" | |
}' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
Do we have a way to use message field value for ElasticSearch index? Means i push old historical data into Kafka then ElasticsearchSinkConnector should be able to push into right ES index "indexPrefix-{messageCreatedDate}". I am not that Timestamp transforms can be used in this case