This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bin/kafka-avro-console-producer \ | |
--broker-list localhost:9092 --topic orders-topic \ | |
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use demo; | |
SELECT * FROM orders; | |
id | created | price | product | qty | |
----+---------------------+-------+-----------------------------------+----- | |
1 | 2016-05-06 13:53:00 | 94.2 | OP-DAX-P-20150201-95.7 | 100 | |
2 | 2016-05-06 13:54:00 | 99.5 | OP-DAX-C-20150201-100 | 100 | |
3 | 2016-05-06 13:55:00 | 10000 | FU-DATAMOUNTAINEER-20150201-100 | 500 | |
4 | 2016-05-06 13:56:00 | 150 | FU-KOSPI-C-20150201-100 | 200 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Write SinkRecords to Cassandra (aSync) in Json | |
* | |
* @param records A list of SinkRecords from Kafka Connect to write. | |
* @return boolean indication successful write. | |
**/ | |
private def insert(records: Seq[SinkRecord]) = { | |
val processors = Runtime.getRuntime.availableProcessors() | |
val executor = Executors.newFixedThreadPool(2 * processors) | |
try { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2} | |
{"id": 2, "created": "2016-05-06 13:54:00", "product": "OP-DAX-C-20150201-100", "price": 99.5} | |
{"id": 3, "created": "2016-05-06 13:55:00", "product": "FU-DATAMOUNTAINEER-20150201-100", "price": 10000} | |
{"id": 4, "created": "2016-05-06 13:56:00", "product": "FU-KOSPI-C-20150201-100", "price": 150} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[2016-08-29 12:02:16,368] INFO Setting newly assigned partitions [orders-topic-0] for group connect-cassandra-sink-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219) | |
[2016-08-29 12:02:16,423] INFO Received 4 records. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:96) | |
[2016-08-29 12:02:16,484] INFO Processed 4 records. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:138) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[2016-08-29 11:41:26,080] INFO CassandraConfigSink values: | |
connect.cassandra.key.space = demo | |
connect.cassandra.export.route.query = INSERT INTO orders SELECT * FROM orders-topic | |
connect.cassandra.trust.store.path = | |
connect.cassandra.ssl.client.cert.auth = false | |
connect.cassandra.password = [hidden] | |
connect.cassandra.key.store.password = [hidden] | |
connect.cassandra.username = cassandra | |
connect.cassandra.error.policy = THROW | |
connect.cassandra.contact.points = localhost |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INSERT INTO <target table> SELECT <fields> FROM <source topic> | |
#Insert mode, select all fields from topicA and write to tableA | |
INSERT INTO tableA SELECT * FROM topicA | |
#Insert mode, select 3 fields and rename from topicB and write to tableB | |
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
routes.query = INSERT INTO transactions SELECT field1 as column1, field2 as column2, field3 FROM topic_A;INSERT INTO transactions SELECT fieldA1 as column1, fieldA2 as column2, fieldC FROM topic_B |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
./bin/kafka-avro-console-producer \ | |
--broker-list localhost:9092 --topic nasty \ | |
--property value.schema='{type:record,name:example.nasty,fields:[{name:elements,type:{type:array,items:{type:record,name:example.nasty.element,fields:[{name:value,type:string},{name:children,type:{type:array,items:example.nasty.element}}]}}}]}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"elements":[{"value":"a","children":[{"value":"a-a","children":[]},{"value":"a-b","children":[]}]},{"value":"b","children":[{"value":"b-a","children":[]},{"value":"b-b","children":[]}]}]} | |
{"elements":[{"value":"a","children":[{"value":"a-a","children":[]},{"value":"a-b","children":[]}]},{"value":"b","children":[{"value":"b-a","children":[]},{"value":"b-b","children":[]}]}]} |