Skip to content

Instantly share code, notes, and snippets.

@dpkp
Created June 4, 2016 21:14
Show Gist options
  • Save dpkp/452ea2080d54bb615ae4779851ada689 to your computer and use it in GitHub Desktop.
Save dpkp/452ea2080d54bb615ae4779851ada689 to your computer and use it in GitHub Desktop.
get specific kafka message
#!/usr/bin/python
import argparse
import socket
import time
from kafka.conn import BrokerConnection
from kafka.protocol.fetch import FetchRequest_v1
def get_message(host, port, topic, partition, offset, max_bytes=1048576):
conn = BrokerConnection(host, port, socket.AF_UNSPEC)
conn.connect()
while conn.connecting():
time.sleep(0.5)
conn.connect()
assert conn.connected(), 'Unable to connect to broker at {}:{}'.format(host, port)
req = FetchRequest_v1(replica_id=-1, max_wait_time=500, min_bytes=1,
topics=[(topic, [(partition, offset, max_bytes)])])
f = conn.send(req)
conn.recv()
while not f.is_done:
time.sleep(0.5)
conn.recv()
assert f.success, 'FetchRequest failed: {}'.format(f.exception)
resp = f.value
print(resp.topics[0])
def main():
parser = argparse.ArgumentParser()
parser.add_argument('host')
parser.add_argument('port', type=int)
parser.add_argument('topic')
parser.add_argument('partition', type=int)
parser.add_argument('offset', type=int)
args = parser.parse_args()
get_message(args.host, args.port, args.topic, args.partition, args.offset)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment