Skip to content

Instantly share code, notes, and snippets.

@ashishb888
Last active December 13, 2019 07:01
Show Gist options
  • Save ashishb888/a3959c2ce464c16871fa5b8a40a6315b to your computer and use it in GitHub Desktop.
Save ashishb888/a3959c2ce464c16871fa5b8a40a6315b to your computer and use it in GitHub Desktop.
Example of File sink & source connector
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$ cat config/connect-standalone.properties
bootstrap.servers=localhost:7092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$ cat config/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$ cat config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$ ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[hdpusr@hdpdev6 config]$ for i in {100..500} ; do echo "test$i" >> ../test.txt ; sleep 1 ; done
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:7092 --from-beginning --topic connect-test
{"schema":{"type":"string","optional":false},"payload":"test100"}
{"schema":{"type":"string","optional":false},"payload":"test101"}
{"schema":{"type":"string","optional":false},"payload":"test102"}
{"schema":{"type":"string","optional":false},"payload":"p0"}
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$ tail -f test.sink.txt
test100
test101
test102
-------------------=-------------------
Without schema
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$ cat config/connect-standalone.properties
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
#value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$ cat config/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/var/tmp/ct1.txt
topic=ct1
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$
[hdpusr@hdpdev6 ~]$ for i in {1..3}; do echo "log line $i"; done > /var/tmp/ct1.txt
[hdpusr@hdpdev6 ~]$ cat /var/tmp/ct1.txt
log line 1
log line 2
log line 3
[hdpusr@hdpdev6 ~]$
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$ $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:7092 --topic ct1 --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum'}
3
[hdpusr@hdpdev6 kafka_2.12-2.3.0]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:7092 --from-beginning --topic ct1
"log line 1"
"log line 2"
"log line 3"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment