Skip to content

Instantly share code, notes, and snippets.

@MasterAler
Last active February 2, 2022 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MasterAler/a3a51d83dc3303ccbac06b718dd22c19 to your computer and use it in GitHub Desktop.
Save MasterAler/a3a51d83dc3303ccbac06b718dd22c19 to your computer and use it in GitHub Desktop.
Simple aiokafka listener with consumer & producer
import asyncio
import logging
import ecs_logging
import sys
import json
from dataclasses import dataclass
from typing import Callable
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.helpers import create_ssl_context
from .config import BOOTSTRAP_SERVERS # <--- that is supposed to be some valid connection string
@dataclass
class Payload:
processor: Callable
consumer_group_id: str
label: str
output_topic_name: str
input_topic_name: str
custom_init: Callable = None
class Listener:
""" Here's a boilerplate class for integration with Kafka """
def __init__(self, payload: Payload):
self.__payload = payload
self.__logger = logging.getLogger()
self.__logger.setLevel(logging.INFO)
self.__handler = logging.StreamHandler(sys.stdout)
self.__handler.setLevel(logging.INFO)
self.__handler.setFormatter(ecs_logging.StdlibFormatter())
self.__logger.addHandler(self.__handler)
if payload.custom_init is not None:
payload.custom_init()
def listen_blocking(self):
asyncio.run(self.listen())
async def listen(self):
consumer = AIOKafkaConsumer(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=self.__payload.consumer_group_id,
metadata_max_age_ms=1000, # This controls the polling interval
security_protocol="SSL",
ssl_context=create_ssl_context()
)
producer = AIOKafkaProducer(client_id="Listener",
bootstrap_servers=BOOTSTRAP_SERVERS,
security_protocol="SSL",
ssl_context=create_ssl_context(),
compression_type="gzip")
await consumer.start()
await producer.start()
consumer.subscribe(pattern=INPUT_TOPIC_NAME)
try:
async for msg in consumer: # Will detect metadata changes
self.__logger.info("Consumed msg by[{}] -- {}:{}".format(self.__payload.consumer_group_id, msg.topic, msg.partition))
try:
msg_data = json.loads(msg.value)
data_id = msg_data["data_id"]
self.__logger.info("Group %s processing data %s", self.__payload.consumer_group_id, data_id)
try:
results = None
try:
results = self.__payload.processor(msg_data["SOME_VALID_DATA_KEY"])
except Exception as err:
self.__logger.error(f"ALGO {self.__payload.label} FAILED ON {data_id}, FIX IT: {err}")
continue
for result in results:
result["data_id"] = data_id
await producer.send(self.__payload.output_topic_name,
json.dumps(result).encode("ascii"),
key=data_id.encode("ascii"))
self.__logger.info("Sent {} for {}".format(self.__payload.label, data_id))
except (RuntimeError, ValueError) as err:
self.__logger.error("Result send failed: {}".format(err))
except ValueError as err:
self.__logger.error("Error with input: {}\n{}".format(msg.value, err))
except (RuntimeError, ValueError) as err:
self.__logger.error("Consumer failed: {}".format(err))
finally:
await consumer.stop()
await producer.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment