Skip to content

Instantly share code, notes, and snippets.

@jakekdodd
Created February 20, 2015 16:30
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jakekdodd/e7ee38fd945818d86eb4 to your computer and use it in GitHub Desktop.
Save jakekdodd/e7ee38fd945818d86eb4 to your computer and use it in GitHub Desktop.
Example of kafka-python producer using Avro

Notes:

  • This uses the python loremipsum generator. You can replace g.generate_sentence() with your own string or generator of choice
  • Make sure to replace schema_path in the script with your own schema path
  • No guarantees that this is the best way to loop over records and publish them to Kafka, just a demo. Before adapting this script, take a look at the Avro documentation...the writer might not need to be instantiated for every record, for example. I'm just not sure.

Instructions

  1. Create a Kafka topic to test this on (default in the script is 'test')
  2. Replace schema_path with the path to user.avsc
  3. Open up a console Kafka consumer (see the 'quick start' section in Kafka's documentation)
  4. From the command line, python kafka_avro_python_example.py
  5. Because the records are Avro-encoded, you'll see some funky characters in the console consumer. This is expected.
import io
import avro.schema
import avro.io
import lipsum
import random
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer, KeyedProducer
g = lipsum.Generator()
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
# Path to user.avsc avro schema
schema_path="/Your/schema/path/user.avsc"
# Kafka topic
topic = "test"
schema = avro.schema.parse(open(schema_path).read())
for i in xrange(2000):
writer = avro.io.DatumWriter(schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write({"name": g.generate_sentence(), "favorite_color": g.generate_sentence(), "favorite_number": random.randint(0,10)}, encoder)
raw_bytes = bytes_writer.getvalue()
producer.send_messages(topic, raw_bytes)
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
@jeffreysoon
Copy link

how to tell which partition your data is sent to?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment