Skip to content

Instantly share code, notes, and snippets.

@deepaksood619
Created July 11, 2019 06:31
Show Gist options
  • Save deepaksood619/b41d65baf26601118a6b9294b806e60e to your computer and use it in GitHub Desktop.
Save deepaksood619/b41d65baf26601118a6b9294b806e60e to your computer and use it in GitHub Desktop.
import base64
import http.client as httplib
import logging
import os
import time
import traceback
import zlib
from confluent_kafka import Consumer, KafkaError, Producer
# logging settings
if eval(os.environ.get('DEBUG')):
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
consumer_config = {
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS'),
'group.id': os.environ.get('KAFKA_SMAP_CONSUMER_CONFIG_GROUP_ID'),
'auto.offset.reset': os.environ.get('KAFKA_SMAP_CONSUMER_CONFIG_AUTO_OFFSET_RESET'), # earliest/latest
'enable.auto.commit': 'false',
# for limiting the amount of messages pre-fetched by librdkafka
'queued.max.messages.kbytes': os.environ.get('KAFKA_SMAP_CONSUMER_CONFIG_QUEUED_MAX_MESSAGES_KBYTES'),
'fetch.message.max.bytes': os.environ.get('KAFKA_SMAP_CONSUMER_CONFIG_FETCH_MESSSAGE_MAX_BYTES')
}
c = Consumer(consumer_config)
p = Producer({
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS')
})
timeout_seconds = 1
SMAP_ARCHIVER_IP = os.environ.get('KAFKA_SMAP_ARCHIVER_IP')
SMAP_ARCHIVER_PORT = os.environ.get('KAFKA_SMAP_ARCHIVER_PORT')
topic = os.environ.get('KAFKA_SMAP_SUBSCRIBE_TOPIC')
# if no topic found
if not topic:
logging.error('No topic: {}'.format(topic))
exit(0)
# callbacks
def print_on_assign(consumer, partitions):
logging.info(f'Assignment: {partitions}')
for partition in partitions:
logging.info(f'watermark: {c.get_watermark_offsets(partition=partition)}')
logging.info(f'committed offsets for all partitions: {c.committed(partitions=partitions)}')
logging.info(f'position: {c.position(partitions=partitions)}')
def print_on_revoke(consumer, partitions):
logging.info(f'Revoke Assignment: {partitions}')
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
# raise error and handle using exception
raise SystemError(err)
else:
logging.debug(f'Message delivered topic:{msg.topic()} partition:{msg.partition()} offset:{msg.offset()}')
c.subscribe([topic], on_assign=print_on_assign, on_revoke=print_on_revoke)
logging.warning('New instance of Kafka SMAP Consumer started')
while True:
msg = c.poll(1.0)
# initial error handling
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logging.error(f'consumer error: {msg.error()}')
break
logging.debug(f'{msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
try:
final_data = msg.value()
logging.debug(f'final_data: {final_data}')
try:
# decompress the gzip-json value
final_data = zlib.decompress(final_data)
final_data = final_data.decode('utf-8')
except zlib.error as e:
logging.debug(f'Error decompressing or uncompressed, Error: {e}')
final_data = base64.b64decode(final_data)
final_data = zlib.decompress(final_data)
# send http request to smap
timeout_seconds = 1
while True:
try:
conn = httplib.HTTPConnection(SMAP_ARCHIVER_IP, SMAP_ARCHIVER_PORT)
conn.request('POST', os.environ.get('KAFKA_SMAP_ARCHIVER_ENDPOINT'), final_data,
headers={'Content-Type': 'application/json'})
response = conn.getresponse()
conn.close()
if response.status == 200:
logging.debug('Data published to smap')
c.commit()
break
elif response.status == 500:
# republish to error_<archiver> topic
logging.error(
f'500 - error in creating stream, status - {response.status}, reason - {response.reason}, msg - {response.msg}, final_data - {final_data}')
p.produce('error_' + msg.topic(), msg.value(), callback=delivery_report)
p.flush()
c.commit()
break
else:
# for every other error
# timeout and retry
raise ValueError(
f'error in creating stream, status - {response.status}, reason - {response.reason}, msg - {response.msg}')
except Exception as e:
logging.error(
f'error in publishing data to smap. Will not attempt for another {timeout_seconds} seconds. error - {e}')
# exponential back-off if exception occurred
time.sleep(timeout_seconds)
timeout_seconds *= 2
except UnicodeDecodeError as e:
logging.error(f'Unicode Decode Error: {e}, data: {msg.value()}')
except Exception as e:
try:
logging.error(f'data/msg: {msg.value()}')
except Exception as es:
logging.error(f'cannot print data: {es}')
logging.error(
f'global exception occurred: {e}, traceback: {traceback.print_exc()}, Will not attempt for another {timeout_seconds} seconds.')
else:
continue
# exponential back-off if exception occurred
time.sleep(timeout_seconds)
timeout_seconds *= 2
logging.info('Kafka SMAP Consumer closed')
c.close()
@deepaksood619
Copy link
Author

# Kafka Environment Variables
KAFKA_SMAP_CONSUMER_CONFIG_QUEUED_MAX_MESSAGES_KBYTES=10000
KAFKA_SMAP_CONSUMER_CONFIG_FETCH_MESSSAGE_MAX_BYTES=15728640

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment