Skip to content

Instantly share code, notes, and snippets.

@rkaneko
Last active July 27, 2018 07:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkaneko/ef798c0c7bce868ad0a4f91ccf9188de to your computer and use it in GitHub Desktop.
Save rkaneko/ef798c0c7bce868ad0a4f91ccf9188de to your computer and use it in GitHub Desktop.

Usage

Boot tmp Kafka

$ docker run --rm -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 spotify/kafka

Create topic

  • Download Kafka binary
# list existing topics
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

# create topic "test"
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Consume records for the topic

# consume messages for the topic "test"
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-biginning
from kafka import KafkaProducer
from kafka.errors import (
NoBrokersAvailable,
KafkaTimeoutError
)
import logging
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(message)s')
def main():
producer = None
try:
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
acks=1,
retries=2,
retry_backoff_ms=100
)
except NoBrokersAvailable as e:
log.warning(e)
finally:
if producer:
producer.close()
if producer is None:
log.info("Cannot use Kafka.")
return
try:
for index in range(100):
print("index {0}: sending...".format(index))
b_msg = str.encode("message: {0}".format(index))
producer.send('test', b_msg)
producer.flush(timeout=5)
print("Flush records")
except KafkaTimeoutError as err:
print(err)
finally:
producer.close()
if __name__ == '__main__':
main()
@rkaneko
Copy link
Author

rkaneko commented Jul 27, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment