Created
June 4, 2016 21:14
-
-
Save dpkp/452ea2080d54bb615ae4779851ada689 to your computer and use it in GitHub Desktop.
get specific kafka message
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import argparse | |
import socket | |
import time | |
from kafka.conn import BrokerConnection | |
from kafka.protocol.fetch import FetchRequest_v1 | |
def get_message(host, port, topic, partition, offset, max_bytes=1048576): | |
conn = BrokerConnection(host, port, socket.AF_UNSPEC) | |
conn.connect() | |
while conn.connecting(): | |
time.sleep(0.5) | |
conn.connect() | |
assert conn.connected(), 'Unable to connect to broker at {}:{}'.format(host, port) | |
req = FetchRequest_v1(replica_id=-1, max_wait_time=500, min_bytes=1, | |
topics=[(topic, [(partition, offset, max_bytes)])]) | |
f = conn.send(req) | |
conn.recv() | |
while not f.is_done: | |
time.sleep(0.5) | |
conn.recv() | |
assert f.success, 'FetchRequest failed: {}'.format(f.exception) | |
resp = f.value | |
print(resp.topics[0]) | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('host') | |
parser.add_argument('port', type=int) | |
parser.add_argument('topic') | |
parser.add_argument('partition', type=int) | |
parser.add_argument('offset', type=int) | |
args = parser.parse_args() | |
get_message(args.host, args.port, args.topic, args.partition, args.offset) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment