Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
{
"name": "es-sink-soe-all-01",
"config": {
"_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"_comment": "--- Elasticsearch-specific config ---",
"_comment": "Elasticsearch server address",
"connection.url": "http://localhost:9200",
"_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist ",
"type.name": "type.name=kafka-connect",
"_comment": "Which topic to stream data from into Elasticsearch",
"topics": "ora-ogg-SOE-ADDRESSES-avro, ora-ogg-SOE-CARD_DETAILS-avro, ora-ogg-SOE-CUSTOMERS-avro, ora-ogg-SOE-INVENTORIES-avro, ora-ogg-SOE-LOGON-avro, ora-ogg-SOE-ORDERENTRY_METADATA-avro, ora-ogg-SOE-ORDERS-avro, ora-ogg-SOE-ORDER_ITEMS-avro, ora-ogg-SOE-PRODUCT_DESCRIPTIONS-avro, ora-ogg-SOE-PRODUCT_INFORMATION-avro, ora-ogg-SOE-WAREHOUSES-avro",
"_comment": "Elasticsearch indices have to have lowercase names, so we need to map them all here",
"topic.index.map": "ora-ogg-SOE-ADDRESSES-avro:soe.addresses, ora-ogg-SOE-CARD_DETAILS-avro:soe.card_details, ora-ogg-SOE-CUSTOMERS-avro:soe.customers, ora-ogg-SOE-INVENTORIES-avro:soe.inventories, ora-ogg-SOE-LOGON-avro:soe.logon, ora-ogg-SOE-ORDERENTRY_METADATA-avro:soe.orderentry_metadata, ora-ogg-SOE-ORDERS-avro:soe.orders, ora-ogg-SOE-ORDER_ITEMS-avro:soe.order_items, ora-ogg-SOE-PRODUCT_DESCRIPTIONS-avro:soe.product_descriptions, ora-ogg-SOE-PRODUCT_INFORMATION-avro:soe.product_information, ora-ogg-SOE-WAREHOUSES-avro:soe.warehouses",
"_comment": "Use SMT to cast op_ts and current_ts to timestamp datatype (TimestampConverter is Kafka >=0.11 / Confluent Platform >=3.3). Format from https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html",
"transforms": "convert_op_ts,convert_current_ts",
"transforms.convert_op_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_ts.target.type": "Timestamp",
"transforms.convert_op_ts.field": "current_ts",
"transforms.convert_op_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"transforms.convert_current_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_current_ts.target.type": "Timestamp",
"transforms.convert_current_ts.field": "op_ts",
"transforms.convert_current_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.