Skip to content

Instantly share code, notes, and snippets.

@anuragphadke
Created October 16, 2016 20:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anuragphadke/4ea11d5652b9088eccf792ffe5feac52 to your computer and use it in GitHub Desktop.
Save anuragphadke/4ea11d5652b9088eccf792ffe5feac52 to your computer and use it in GitHub Desktop.
#zoo
docker run -d \
--net=host \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=32181 \
confluentinc/cp-zookeeper:3.0.1
#kafka
docker run -d \
--net=host \
--name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=localhost:32181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092 \
confluentinc/cp-kafka:3.0.1
#topic foo
docker run \
--net=host \
--rm confluentinc/cp-kafka:3.0.1 \
kafka-topics --create --topic foo --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:32181
#sample topic
docker run --net=host --rm confluentinc/cp-kafka:3.0.1 bash -c "echo '{"foo":"bar"}' | kafka-console-producer --request-required-acks 1 --broker-list localhost:29092 --topic foo && echo 'done.'"
#read (for testing)
docker run \
--net=host \
--rm \
confluentinc/cp-kafka:3.0.1 \
kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --new-consumer --from-beginning --max-messages 42
#kafka rest
docker run -d \
--net=host \
--name=kafka-rest \
-e KAFKA_REST_ZOOKEEPER_CONNECT=localhost:32181 \
-e KAFKA_REST_LISTENERS=http://localhost:8082 \
confluentinc/cp-kafka-rest:3.0.1
#quickstart group topics
docker run \
--net=host \
--rm \
confluentinc/cp-kafka:3.0.1 \
kafka-topics --create --topic quickstart-offsets --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:32181
#quickstart group topics
docker run \
--net=host \
--rm \
confluentinc/cp-kafka:3.0.1 \
kafka-topics --create --topic quickstart-config --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:32181
#quickstart group topics
docker run \
--net=host \
--rm \
confluentinc/cp-kafka:3.0.1 \
kafka-topics --create --topic quickstart-status --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:32181
<SSH INTO DOCKER-MACHINE>
docker run -d \
--name=kafka-connect \
--net=host \
-e CONNECT_BOOTSTRAP_SERVERS=localhost:29092 \
-e CONNECT_REST_PORT=28082 \
-e CONNECT_GROUP_ID="quickstart" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-status" \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
-e CONNECT_ZOOKEEPER_CONNECT=32181 \
-v /Users/aphadke/kfc/kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-3.2.0-SNAPSHOT-package/share/java/kafka-connect-elasticsearch/:/usr/share/java/kafka-connect-elasticsearch \
-v /Users/aphadke/kfc/kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-3.2.0-SNAPSHOT-package/etc/kafka-connect-elasticsearch:/etc/kafka-connect-elasticsearch \
-v /tmp/quickstart/file:/tmp/quickstart \
confluentinc/cp-kafka-connect:3.0.1
<RUN ON A DIFFERENT TERMINAL, export $CONNECT_HOST>
curl -X POST -H "Content-Type: application/json" --data '{"name": "elasticsearch-sink", "config": {"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max":"1", "elasticsearch.cluster.name":"test-cluster","type.name":"kafka-connect", "elasticsearch.index.hosts":"192.168.99.100:9300","elasticsearch.index.prefix":"prefix","elasticsearch.document.name":"document","elasticsearch.bulk.size":"1","topics":"foo","schema.ignore": true, "key.ignore": true, "connection.url":"http://192.168.99.100:9200"}}' http://$CONNECT_HOST:28082/connectors
docker logs kafka-connect
#display error here:
[2016-10-16 20:36:29,980] ERROR Task elasticsearch-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:356)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-10-16 20:36:29,980] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2016-10-16 20:36:29,980] INFO Stopping ElasticsearchSinkTask. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask)
curl -X DELETE http://$CONNECT_HOST:28082/connectors/elasticsearch-sink
curl -X POST -H "Content-Type: application/json" \
--data '{"name": "elasticsearch-sink", "config": {"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max":"1", "elasticsearch.cluster.name":"test-cluster","type.name":"kafka-connect", "elasticsearch.index.hosts":"192.168.99.100:9300","elasticsearch.index.prefix":"prefix","elasticsearch.document.name":"document","elasticsearch.bulk.size":"1","topics":"foo","schema.ignore": true, "key.ignore": true, "connection.url":"http://192.168.99.100:9200"}}' \
http://$CONNECT_HOST:28082/connectors
docker run -d \
--name=kafka-connect \
--net=host \
-e CONNECT_BOOTSTRAP_SERVERS=localhost:29092 \
-e CONNECT_REST_PORT=28082 \
-e CONNECT_GROUP_ID="quickstart" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-status" \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
-e CONNECT_ZOOKEEPER_CONNECT=32181 \
-v /Users/aphadke/kfc/kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-3.2.0-SNAPSHOT-package/share/java/kafka-connect-elasticsearch/:/usr/share/java/kafka-connect-elasticsearch \
-v /Users/aphadke/kfc/kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-3.2.0-SNAPSHOT-package/etc/kafka-connect-elasticsearch:/etc/kafka-connect-elasticsearch \
-v /tmp/quickstart/file:/tmp/quickstart \
confluentinc/cp-kafka-connect:3.0.1
curl -X POST \
-H "Content-Type: application/json" \
--data '{"name": "quickstart-file-source", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", "tasks.max":"1", "topic":"q", "file": "/tmp/quickstart/input.txt"}}' \
http://$CONNECT_HOST:28082/connectors
docker run \
--net=host \
--rm \
confluentinc/cp-kafka:3.0.1 \
kafka-console-consumer --bootstrap-server localhost:29092 --topic q --new-consumer --from-beginning --max-messages 10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment