Skip to content

Instantly share code, notes, and snippets.

View andrewstevenson's full-sized avatar

Andrew Stevenson andrewstevenson

View GitHub Profile
{
"type": "record",
"name": "example.nasty",
"fields": [
{
"name": "elements",
"type": {
"type": "array",
"items" : {
./bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic nasty \
--property value.schema='{"type":"record","name":"example.nasty","fields":[{"name":"elements","type":{"type":"array","items":{"type":"record","name":"example.nasty.element","fields":[{"name":"value","type":"string"},{"name":"children","type":{"type":"array","items":"example.nasty.element"}}]}}}]}'
{"elements":[{"value":"a","children":[{"value":"a-a","children":[]},{"value":"a-b","children":[]}]},{"value":"b","children":[{"value":"b-a","children":[]},{"value":"b-b","children":[]}]}]}
{"elements":[{"value":"a","children":[{"value":"a-a","children":[]},{"value":"a-b","children":[]}]},{"value":"b","children":[{"value":"b-a","children":[]},{"value":"b-b","children":[]}]}]}
...
name=cassandra-sink-orders
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders-topic
connect.cassandra.export.route.query=INSERT INTO orders SELECT * FROM orders-topic
connect.cassandra.contact.points=localhost
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra
#get the sink
wget https://github.com/datamountaineer/stream-reactor/releases/download/v0.2/stream-reactor-release-0.2-3.0.0.tar.gz
#unpack
tar -xvf stream-reactor-release-0.2-3.0.0.tar.gz
#add to CLASSPATH
export CLASSPATH=$(PWD)/stream-reactor-release-0.2-3.0.0/kafka-connect-cassandra-0.2-3.0.0-all.jar
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
curl http://localhost:8083/connector-plugins
[{"class":"io.confluent.connect.hdfs.tools.SchemaSourceConnector"},
{"class":"com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector"},
{"class":"io.confluent.connect.jdbc.JdbcSourceConnector"},
{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector"},
{"class":"io.confluent.connect.hdfs.HdfsSinkConnector"},
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector"},
{"class":"com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector"}]
#make a folder for cassandra
mkdir cassandra
#Download Cassandra
wget http://apache.cs.uu.nl/cassandra/3.5/apache-cassandra-3.5-bin.tar.gz
#extract archive to cassandra folder
tar -xvf apache-cassandra-3.5-bin.tar.gz -C cassandra
#Set up environment variables
CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
use demo;
create table orders (id int, created varchar, product varchar, qty int, price float, PRIMARY KEY (id, created))
WITH CLUSTERING ORDER BY (created asc);
java -jar kafka-connect-cli-0.5-all.jar create cassandra-sink-orders < cassandra-sink-distributed-orders.properties
#Connector `cassandra-sink-orders`:
name=cassandra-sink-orders
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders-topic
connect.cassandra.export.route.query=INSERT INTO orders SELECT * FROM orders-topic
connect.cassandra.contact.points=localhost