Skip to content

Instantly share code, notes, and snippets.

@dnwe
Created August 11, 2016 13:57
Show Gist options
  • Save dnwe/314caca79e779e6356d7a08b076ea901 to your computer and use it in GitHub Desktop.
Save dnwe/314caca79e779e6356d7a08b076ea901 to your computer and use it in GitHub Desktop.
Simple proof of concept for using kafka-python to talk to IBM Message Hub
#!/usr/bin/python3
"""
Simple proof of concept for using kafka-python to talk to IBM Message Hub
https://gist.github.com/dnwe
http://kafka-python.readthedocs.io/
"""
import os
import ssl
import sys
import json
import asyncio
import requests
from kafka import KafkaConsumer, KafkaProducer
MESSAGEHUB_SERVICE_KEY = 'messagehub'
MESSAGEHUB_TOPIC = 'python'
NUM_ROUNDTRIPS = 10
def get_credentials():
if os.environ.get('VCAP_SERVICES'):
vcap_services = json.loads(os.environ.get('VCAP_SERVICES'))
for vcap_service in vcap_services:
if vcap_service.startswith(MESSAGEHUB_SERVICE_KEY):
messagehub_service = vcap_services[vcap_service][0]
return messagehub_service['credentials']
raise Exception('Not "{}" service found in VCAP_SERVICES'.format(
MESSAGEHUB_SERVICE_KEY))
def get_producer(configs):
return KafkaProducer(
value_serializer=lambda m: json.dumps(m).encode('utf8'),
**configs)
def get_consumer(configs):
consumer = KafkaConsumer(
MESSAGEHUB_TOPIC,
value_deserializer=lambda m: json.loads(m.decode('utf8')),
**configs)
consumer.poll()
return consumer
def get_or_create_topic(topic, credentials):
api_key = str(credentials['api_key'])
kafka_admin_url = str(credentials['kafka_admin_url'])
session = requests.Session()
r = session.get('{}/admin/topics'.format(kafka_admin_url),
headers={'X-Auth-Token': api_key})
if r.status_code != 200:
raise Exception(r.headers, r.content)
topics = r.json()
if MESSAGEHUB_TOPIC in [x['name'] for x in topics]:
print('{} topic already exists in kafka'.format(MESSAGEHUB_TOPIC))
else:
print('creating {} topic'.format(MESSAGEHUB_TOPIC))
r = session.post('{}/admin/topics'.format(kafka_admin_url),
headers={'X-Auth-Token': api_key},
data=json.dumps({'name': MESSAGEHUB_TOPIC,
'partitions': 1}))
if r.status_code != 202: # topic creation pending
raise Exception(r.headers, r.content)
@asyncio.coroutine
def publish(producer):
for num in range(NUM_ROUNDTRIPS):
producer.send(MESSAGEHUB_TOPIC, 'msg-{}'.format(num))
yield from asyncio.sleep(1)
producer.flush()
print('Published all {} messages'.format(NUM_ROUNDTRIPS))
@asyncio.coroutine
def poll(consumer):
while True:
result = consumer.poll()
for tp, records in result.items():
print('Received {} on {}'.format(len(records), tp))
for record in records:
print(record.value)
if str(NUM_ROUNDTRIPS) in record.value:
break
yield from asyncio.sleep(1)
@asyncio.coroutine
def main():
credentials = get_credentials()
get_or_create_topic(MESSAGEHUB_TOPIC, credentials)
configs = {
'bootstrap_servers': credentials['kafka_brokers_sasl'],
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': credentials['api_key'][0:16],
'sasl_plain_password': credentials['api_key'][16:48],
'security_protocol': 'SASL_SSL',
'ssl_context': ssl.create_default_context(),
}
tasks = [
asyncio.async(poll(get_consumer(configs))),
asyncio.async(publish(get_producer(configs))),
]
done, pending = yield from asyncio.wait(tasks)
for future in done | pending:
future.result()
if __name__ == "__main__":
sys.exit(asyncio.get_event_loop().run_until_complete(main()))
@webe3
Copy link

webe3 commented Feb 16, 2017

I'm` not sure what you are trying to do in your poll function with
if str(NUM_ROUNDTRIPS) in record.value:
break
NUM_ROUNDTRIPS was set to 10 and you use it in a range to create messages from 0 to 9. The test in poll will never show NUM_ROUNDTRIPS in record .value. Even if it did, the break only breaks out of the records for loop which would have ended anyway.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment