Skip to content

Instantly share code, notes, and snippets.

@meyarivan
Created June 9, 2015 22:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save meyarivan/f7cb7d86d0f2ae331e53 to your computer and use it in GitHub Desktop.
Save meyarivan/f7cb7d86d0f2ae331e53 to your computer and use it in GitHub Desktop.
Decode Kafka msgs submitted by Mypipe
#!/usr/bin/env python2
from __future__ import print_function
import os, sys
from kafka import KafkaConsumer
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import io
import json
import struct
def deserialize_kafka_payload(message, schema):
magic = ord(message.value[0])
mutation_type = ord(message.value[1])
schema_id = struct.unpack("h", message.value[2:4])
# schema id is ignored for the purpose of this script
msg = io.BytesIO(message.value[4:])
decoder = avro.io.BinaryDecoder(msg)
reader = avro.io.DatumReader(schema.schemas[mutation_type])
return reader.read(decoder)
def main(schema_file, topic, group_id, brokers, callback):
schema = avro.schema.parse(open(schema_file, 'r').read())
consumer = KafkaConsumer(topic,
group_id = group_id,
metadata_broker_list = brokers)
for message in consumer:
callback(deserialize_kafka_payload(message, schema))
if __name__ == '__main__':
if len(sys.argv) < 4:
print >> sys.stderr, ("Usage: %s schema_file topic "
"kafka_consumer_group_id broker1 "
"[broker2 broker3 ..]" % sys.argv[0])
sys.exit(2)
main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4:],
lambda x: print(x))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment