Created
August 11, 2016 13:57
-
-
Save dnwe/314caca79e779e6356d7a08b076ea901 to your computer and use it in GitHub Desktop.
Simple proof of concept for using kafka-python to talk to IBM Message Hub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
Simple proof of concept for using kafka-python to talk to IBM Message Hub | |
https://gist.github.com/dnwe | |
http://kafka-python.readthedocs.io/ | |
""" | |
import os | |
import ssl | |
import sys | |
import json | |
import asyncio | |
import requests | |
from kafka import KafkaConsumer, KafkaProducer | |
MESSAGEHUB_SERVICE_KEY = 'messagehub' | |
MESSAGEHUB_TOPIC = 'python' | |
NUM_ROUNDTRIPS = 10 | |
def get_credentials(): | |
if os.environ.get('VCAP_SERVICES'): | |
vcap_services = json.loads(os.environ.get('VCAP_SERVICES')) | |
for vcap_service in vcap_services: | |
if vcap_service.startswith(MESSAGEHUB_SERVICE_KEY): | |
messagehub_service = vcap_services[vcap_service][0] | |
return messagehub_service['credentials'] | |
raise Exception('Not "{}" service found in VCAP_SERVICES'.format( | |
MESSAGEHUB_SERVICE_KEY)) | |
def get_producer(configs): | |
return KafkaProducer( | |
value_serializer=lambda m: json.dumps(m).encode('utf8'), | |
**configs) | |
def get_consumer(configs): | |
consumer = KafkaConsumer( | |
MESSAGEHUB_TOPIC, | |
value_deserializer=lambda m: json.loads(m.decode('utf8')), | |
**configs) | |
consumer.poll() | |
return consumer | |
def get_or_create_topic(topic, credentials): | |
api_key = str(credentials['api_key']) | |
kafka_admin_url = str(credentials['kafka_admin_url']) | |
session = requests.Session() | |
r = session.get('{}/admin/topics'.format(kafka_admin_url), | |
headers={'X-Auth-Token': api_key}) | |
if r.status_code != 200: | |
raise Exception(r.headers, r.content) | |
topics = r.json() | |
if MESSAGEHUB_TOPIC in [x['name'] for x in topics]: | |
print('{} topic already exists in kafka'.format(MESSAGEHUB_TOPIC)) | |
else: | |
print('creating {} topic'.format(MESSAGEHUB_TOPIC)) | |
r = session.post('{}/admin/topics'.format(kafka_admin_url), | |
headers={'X-Auth-Token': api_key}, | |
data=json.dumps({'name': MESSAGEHUB_TOPIC, | |
'partitions': 1})) | |
if r.status_code != 202: # topic creation pending | |
raise Exception(r.headers, r.content) | |
@asyncio.coroutine | |
def publish(producer): | |
for num in range(NUM_ROUNDTRIPS): | |
producer.send(MESSAGEHUB_TOPIC, 'msg-{}'.format(num)) | |
yield from asyncio.sleep(1) | |
producer.flush() | |
print('Published all {} messages'.format(NUM_ROUNDTRIPS)) | |
@asyncio.coroutine | |
def poll(consumer): | |
while True: | |
result = consumer.poll() | |
for tp, records in result.items(): | |
print('Received {} on {}'.format(len(records), tp)) | |
for record in records: | |
print(record.value) | |
if str(NUM_ROUNDTRIPS) in record.value: | |
break | |
yield from asyncio.sleep(1) | |
@asyncio.coroutine | |
def main(): | |
credentials = get_credentials() | |
get_or_create_topic(MESSAGEHUB_TOPIC, credentials) | |
configs = { | |
'bootstrap_servers': credentials['kafka_brokers_sasl'], | |
'sasl_mechanism': 'PLAIN', | |
'sasl_plain_username': credentials['api_key'][0:16], | |
'sasl_plain_password': credentials['api_key'][16:48], | |
'security_protocol': 'SASL_SSL', | |
'ssl_context': ssl.create_default_context(), | |
} | |
tasks = [ | |
asyncio.async(poll(get_consumer(configs))), | |
asyncio.async(publish(get_producer(configs))), | |
] | |
done, pending = yield from asyncio.wait(tasks) | |
for future in done | pending: | |
future.result() | |
if __name__ == "__main__": | |
sys.exit(asyncio.get_event_loop().run_until_complete(main())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I'm` not sure what you are trying to do in your poll function with
if str(NUM_ROUNDTRIPS) in record.value:
break
NUM_ROUNDTRIPS was set to 10 and you use it in a range to create messages from 0 to 9. The test in poll will never show NUM_ROUNDTRIPS in record .value. Even if it did, the break only breaks out of the records for loop which would have ended anyway.