Skip to content

Instantly share code, notes, and snippets.

@fabiotatsuo
Last active February 18, 2021 04:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fabiotatsuo/dda0ea7abeb383f942e6446788617ad3 to your computer and use it in GitHub Desktop.
Save fabiotatsuo/dda0ea7abeb383f942e6446788617ad3 to your computer and use it in GitHub Desktop.
Kafka 2.7.0 install Mac OS, Mqtt connector

Quickstart

https://kafka.apache.org/documentation/#quickstart

Install kafka

$ brew install kafka
==> Summary
🍺  /usr/local/Cellar/kafka/2.7.0: 187 files, 65.4MB
==> Caveats
==> openjdk
For the system Java wrappers to find this JDK, symlink it with
  sudo ln -sfn /usr/local/opt/openjdk/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk.jdk

openjdk is keg-only, which means it was not symlinked into /usr/local,
because it shadows the macOS `java` wrapper.

If you need to have openjdk first in your PATH run:
  echo 'export PATH="/usr/local/opt/openjdk/bin:$PATH"' >> ~/.zshrc

For compilers to find openjdk you may need to set:
  export CPPFLAGS="-I/usr/local/opt/openjdk/include"

==> zookeeper
To have launchd start zookeeper now and restart at login:
  brew services start zookeeper
Or, if you don't want/need a background service you can just run:
  zkServer start
==> kafka
To have launchd start kafka now and restart at login:
  brew services start kafka
Or, if you don't want/need a background service you can just run:
  zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

Default ports

java  TCP *:54472 (LISTEN)
java  TCP *:2181 (LISTEN)
java  TCP *:54471 (LISTEN)
java  TCP *:9092 (LISTEN)

Config files

$ cd /usr/local/etc/kafka
connect-console-sink.properties
connect-console-source.properties
connect-distributed.properties
connect-file-sink.properties
connect-file-source.properties
connect-log4j.properties
connect-mirror-maker.properties
connect-standalone.properties
consumer.properties
log4j.properties
producer.properties
server.properties
tools-log4j.properties
trogdor.conf
zookeeper.properties

Kafka mqtt connectors

https://enfuse.io/a-diy-guide-to-kafka-connectors/
https://github.com/SINTEF-9012/kafka-mqtt-sink-connector
https://github.com/SINTEF-9012/kafka-mqtt-source-connector
https://github.com/johanvandevenne/kafka-connect-mqtt

Logs

$ cd /usr/local/var/lib/kafka-logs
$ cd /usr/local/var/lib/zookeeper/version-2

Links

http://kafka.apache.org/081/documentation/#quickstart
https://supergloo.com/kafka-connect/running-kafka-connect-standalone-vs-distributed-mode-examples/

Plugin kafka-connect-mqtt

https://github.com/johanvandevenne/kafka-connect-mqtt

Pre requisites, tested

os java maven emqx
Mac OS Big Sur openjdk version "15.0.1" 3.6.3 4.2.7

Build Jar connector files

$ cd /kafka-connect-mqtt/
$ mvn clean install

Copy jar files to java/kafka folder

$ mkdir /usr/local/Cellar/openjdk/15.0.1/kafka/
$ cp -r ./target/kafka-connect-mqtt-1.1.1-package/kafka-connect-mqtt /usr/local/Cellar/openjdk/15.0.1/kafka
  1. Standalone

Make KAFKA broker listen to all interfaces.

$ vim server.properties
listeners = PLAINTEXT://0.0.0.0:9092
advertised.listeners = PLAINTEXT://127.0.0.1:9092

Config Source

$ vim /usr/local/etc/kafka/kafka-connect-mqtt-source.properties
name=kafka_connect_mqtt_source
connector.class=be.jovacon.kafka.connect.MQTTSourceConnector
taks.max=1
mqtt.connector.kafka.name=kafka_connect_mqtt_source
mqtt.broker=tcp://0.0.0.0:1883
mqtt.clientID=mqtt_source_client_id
mqtt.topic=test
kafka.topic=connect-mqtt-kafka
#key.converter=org.apache.kafka.connect.storage.StringConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter.schemas.enable=false
#value.converter.schemas.enable=false
#key.serializer=org.apache.kafka.common.serialization.StringSerializer
#value.serializer=org.apache.kafka.common.serialization.StringSerializer

Config Sink

$ vim /usr/local/etc/kafka/kafka-connect-mqtt-sink.properties
name=kafka_connect_mqtt_sink
connector.class=be.jovacon.kafka.connect.MQTTSinkConnector
taks.max=1
mqtt.connector.kafka.name=kafka_connect_mqtt_sink
mqtt.broker=tcp://0.0.0.0:1883
mqtt.clientID=mqtt_sink_client_id
mqtt.topic=test2
topics=connect-mqtt-kafka
#topics.regex=downstream
#key.converter=org.apache.kafka.connect.storage.StringConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter.schemas.enable=false
#value.converter.schemas.enable=false
$ vim /usr/local/etc/kafka/connect-distributed.properties
port.host=19005

plugin.path=/usr/local/etc/openjdk/15.0.1/,/usr/local/etc/kafka/plugins,/usr/local/opt/java/
$ vim /usr/local/etc/kafka/connect-standalone.properties
port.host=19005

plugin.path=/usr/local/etc/openjdk/15.0.1/,/usr/local/etc/kafka/plugins,/usr/local/opt/java/

Start standalone

$ connect-standalone /usr/local/etc/kafka/connect-standalone.properties /usr/local/etc/kafka/kafka-connect-mqtt-source.properties /usr/local/etc/kafka/kafka-connect-mqtt-sink.properties
  1. Distributed

Delete topics

$ kafka-topics --zookeeper localhost:2181 --topic connect-configs --delete
$ kafka-topics --zookeeper localhost:2181 --topic connect-offsets --delete
$ kafka-topics --zookeeper localhost:2181 --topic connect-status --delete

Cluster, replication=3

$ cp /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/server-1.properties
$ vim /usr/local/etc/kafka/server-1.properties
broker.id=1
listeners = PLAINTEXT://0.0.0.0:9093
advertised.listeners = PLAINTEXT://127.0.0.1:9093
log.dir=/tmp/kafka-logs-1


$ cp /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/server-2.properties
$ vim /usr/local/etc/kafka/server-2.properties
broker.id=2
listeners = PLAINTEXT://0.0.0.0:9094
advertised.listeners = PLAINTEXT://127.0.0.1:9094
log.dir=/tmp/kafka-logs-2

Create topics

$ kafka-topics --create --bootstrap-server localhost:9092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact

$ kafka-topics --create --bootstrap-server localhost:9092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact

$ kafka-topics --create --bootstrap-server localhost:9092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact

$ kafka-topics --list --zookeeper localhost:2181

Start Distributed

$ connect-distributed /usr/local/etc/kafka/connect-distributed.properties

Load Sink

curl -X POST \
  http://127.0.0.1:19005/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "kafka-mqtt-sink-connector",
    "config":
    {
      "connector.class":"be.jovacon.kafka.connect.MQTTSinkConnector",
      "mqtt.topic":"test2",
      "topics":"connect-mqtt-kafka",
      "mqtt.clientID":"mqtt_sink_client_id",
      "mqtt.broker":"tcp://127.0.0.1:1883",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable":"false",
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter.schemas.enable":"false"
    }
}'

Load Source

curl -X POST \
  http://127.0.0.1:19005/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "kafka-mqtt-source-connector",
    "config":
    {
      "connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector",
      "mqtt.topic":"test",
      "kafka.topic":"connect-mqtt-kafka",
      "mqtt.clientID":"mqtt_source_client_id",
      "mqtt.broker":"tcp://127.0.0.1:1883",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable":"false",
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter.schemas.enable":"false"
    }
}'

Check status

http://127.0.0.1:19005/connectors/kafka-mqtt-source-connector/status

Kafka commands

List topics

$ kafka-topics --list --zookeeper localhost:2181

$ kafka-topics --list --bootstrap-server 127.0.0.1:9092

Create topic

$ kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic connect-mqtt-kafka

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Add partitions

$ kafka-topics --zookeeper localhost:2181 --alter --topic my-topic --partitions 16

Delete topic

$ kafka-topics --zookeeper localhost:2181 --delete --topic topic

Topic details

$ kafka-topics --describe --zookeeper localhost:2181 --topic test

Topic under replication partition

$ kafka-topics --zookeeper localhost:2181/kafka-cluster --describe --under-replicated-partitions

Post to Connect Interface

$ curl -s -X POST -H "Content-Type: application/json" http://127.0.0.1:19005/connectors -d '{"name":"kafka-connect-mqtt-source","config":{"connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector","tasks.max":"1","mqtt.broker":"tcp://localhost:1883", "mqtt.topic":"test","kafka.topic":"connect-mqtt-kafka"}}'

Connectors

$ curl --header "Content-Type: application/json" http://127.0.0.1:19005/connectors
["mqtt-source-connector"]%

List connectors
http://127.0.0.1:19005/connector-plugins
Or

$ brew install jq
$ curl -s -XGET http://localhost:19005/connector-plugins|jq '.[].class'
"com.sintef.asam.MqttSinkConnector"
"com.sintef.asam.MqttSourceConnector"
"org.apache.kafka.connect.file.FileStreamSinkConnector"
"org.apache.kafka.connect.file.FileStreamSourceConnector"
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector"
"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
"org.apache.kafka.connect.mirror.MirrorSourceConnector"
$ curl -s -o /dev/null -w %{http_code} http://localhost:19005/connectors

Shutdown connector

curl -s -X PUT http://localhost:19005/connectors/mqtt-sink-connector/pause
curl -s -X GET http://localhost:19005/connectors/mqtt-sink-connector/status
curl -s -X DELETE http://localhost:19005/connectors/mqtt-sink-connector

curl -s -X PUT http://localhost:19005/connectors/mqtt-source-connector/pause
curl -s -X GET http://localhost:19005/connectors/mqtt-source-connector/status
curl -s -X DELETE http://localhost:19005/connectors/mqtt-source-connector

Consumer

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning -property print.key=true

Producer

New message

$ kafka-console-producer --broker-list localhost:9092 --topic test < messages.txt
$ kafka-console-producer --broker-list localhost:9092 --topic connect-mqtt-kafka < ../../../kafka/message.txt
$ kafka-console-producer --broker-list localhost:9092 --topic test
>{"Hello":"World!", "topic":"test"}
>Any message'

Mosquitto client

$ brew install mosquitto
$ cd /usr/local/Cellar/mosquitto/2.0.7/bin/
$ mosquitto_pub -t 'test' -m 'helloWorld'

Performance

$ kafka-producer-perf-test --topic position-reports --throughput 10000 --record-size 300 --num-records 20000 --producer-props bootstrap.servers="localhost:9092"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment