Skip to content

Instantly share code, notes, and snippets.

View andrewstevenson's full-sized avatar

Andrew Stevenson andrewstevenson

View GitHub Profile
bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders-topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'
use demo;
SELECT * FROM orders;
id | created | price | product | qty
----+---------------------+-------+-----------------------------------+-----
1 | 2016-05-06 13:53:00 | 94.2 | OP-DAX-P-20150201-95.7 | 100
2 | 2016-05-06 13:54:00 | 99.5 | OP-DAX-C-20150201-100 | 100
3 | 2016-05-06 13:55:00 | 10000 | FU-DATAMOUNTAINEER-20150201-100 | 500
4 | 2016-05-06 13:56:00 | 150 | FU-KOSPI-C-20150201-100 | 200
/**
* Write SinkRecords to Cassandra (aSync) in Json
*
* @param records A list of SinkRecords from Kafka Connect to write.
* @return boolean indication successful write.
**/
private def insert(records: Seq[SinkRecord]) = {
val processors = Runtime.getRuntime.availableProcessors()
val executor = Executors.newFixedThreadPool(2 * processors)
try {
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2}
{"id": 2, "created": "2016-05-06 13:54:00", "product": "OP-DAX-C-20150201-100", "price": 99.5}
{"id": 3, "created": "2016-05-06 13:55:00", "product": "FU-DATAMOUNTAINEER-20150201-100", "price": 10000}
{"id": 4, "created": "2016-05-06 13:56:00", "product": "FU-KOSPI-C-20150201-100", "price": 150}
[2016-08-29 12:02:16,368] INFO Setting newly assigned partitions [orders-topic-0] for group connect-cassandra-sink-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
[2016-08-29 12:02:16,423] INFO Received 4 records. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:96)
[2016-08-29 12:02:16,484] INFO Processed 4 records. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:138)
[2016-08-29 11:41:26,080] INFO CassandraConfigSink values:
connect.cassandra.key.space = demo
connect.cassandra.export.route.query = INSERT INTO orders SELECT * FROM orders-topic
connect.cassandra.trust.store.path =
connect.cassandra.ssl.client.cert.auth = false
connect.cassandra.password = [hidden]
connect.cassandra.key.store.password = [hidden]
connect.cassandra.username = cassandra
connect.cassandra.error.policy = THROW
connect.cassandra.contact.points = localhost
INSERT INTO <target table> SELECT <fields> FROM <source topic>
#Insert mode, select all fields from topicA and write to tableA
INSERT INTO tableA SELECT * FROM topicA
#Insert mode, select 3 fields and rename from topicB and write to tableB
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB
routes.query = INSERT INTO transactions SELECT field1 as column1, field2 as column2, field3 FROM topic_A;INSERT INTO transactions SELECT fieldA1 as column1, fieldA2 as column2, fieldC FROM topic_B
./bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic nasty \
--property value.schema='{type:record,name:example.nasty,fields:[{name:elements,type:{type:array,items:{type:record,name:example.nasty.element,fields:[{name:value,type:string},{name:children,type:{type:array,items:example.nasty.element}}]}}}]}'
{"elements":[{"value":"a","children":[{"value":"a-a","children":[]},{"value":"a-b","children":[]}]},{"value":"b","children":[{"value":"b-a","children":[]},{"value":"b-b","children":[]}]}]}
{"elements":[{"value":"a","children":[{"value":"a-a","children":[]},{"value":"a-b","children":[]}]},{"value":"b","children":[{"value":"b-a","children":[]},{"value":"b-b","children":[]}]}]}