Skip to content

Instantly share code, notes, and snippets.

@ycd
Created February 2, 2021 23:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ycd/c6de3a76ebb25a4da7b27ce80cfd8daa to your computer and use it in GitHub Desktop.
Save ycd/c6de3a76ebb25a4da7b27ce80cfd8daa to your computer and use it in GitHub Desktop.
async logger for kafka
import logging
import json
import asyncio
from typing import Dict, Any
from threading import Thread
import confluent_kafka
from confluent_kafka.cimpl import KafkaException
class AIOProducer:
def __init__(self, *, configs: Dict[str, Any], topic: str, loop: asyncio.AbstractEventLoop = None):
self._loop = loop or asyncio.get_event_loop()
self._producer = confluent_kafka.Producer(configs)
self.topic = topic
self._cancelled = False
self._poll_thread = Thread(target=self._poll_loop)
self._poll_thread.start()
def _poll_loop(self):
while not self._cancelled:
self._producer.poll(0.1)
def close(self):
self._cancelled = True
self._poll_thread.join()
def produce(self, value):
"""
An awaitable produce method.
"""
result = self._loop.create_future()
def ack(err, msg):
if err:
self._loop.call_soon_threadsafe(
result.set_exception, KafkaException(err)
)
else:
self._loop.call_soon_threadsafe(result.set_result, msg)
if isinstance(value, dict):
value = json.dumps(value)
self._producer.produce(self.topic, value, on_delivery=ack)
return result
class KafkaLogger(logging.Handler):
def __init__(self, *, configs: Dict[str, Any], topic: str='kafka-test'):
"""Initialize an instance of the kafka handler."""
logging.Handler.__init__(self)
self.producer = AIOProducer(configs=configs, topic=topic)
def emit(self, record):
"""Emit the provided record to the kafka_client producer."""
# drop kafka logging to avoid infinite recursion
if 'kafka.' in record.name:
return
try:
log = self.format(record)
print(log)
self.producer.produce({'log': log})
except Exception:
logging.Handler.handleError(self, record)
def close(self):
"""Close the producer and clean up."""
self.acquire()
try:
if self.producer:
self.producer.close()
logging.Handler.close(self)
finally:
self.release()
kafka_config = {
"bootstrap.servers": "0.0.0.0:9092",
}
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
kl = KafkaLogger(configs=kafka_config, topic="kafka-test")
logger.addHandler(kl)
logger.info("testing the logger 123")
logger.debug("321 reggol eht gnitset")
logger.info("testing the logger 123")
logger.debug("321 reggol eht gnitset")
logger.info("testing the logger 123")
logger.debug("321 reggol eht gnitset")
logger.info("testing the logger 123")
logger.debug("321 reggol eht gnitset")
logger.info("testing the logger 123")
logger.debug("321 reggol eht gnitset")
logger.info("testing the logger 123")
logger.debug("321 reggol eht gnitset")
logger.info("testing the logger 123")
logger.debug("321 reggol eht gnitset")
logger.info("testing the logger 123")
logger.debug("321 reggol eht gnitset")
logger.info("testing the logger 123")
logger.debug("321 reggol eht gnitset")
logger.info("testing the logger 123")
logger.debug("321 reggol eht gnitset")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment