Skip to content

Instantly share code, notes, and snippets.

@coxley
Last active November 21, 2021 00:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save coxley/14a1e3d98898132fa89c4e025cfa840f to your computer and use it in GitHub Desktop.
Save coxley/14a1e3d98898132fa89c4e025cfa840f to your computer and use it in GitHub Desktop.
Handler w/ flusher thread running in an event loop.
import asyncio
import logging
import time
import threading
import typing as t
import janus
LOG = logging.getLogger(__name__)
# Queue must be created within the event loop it will be used from. Start as
# None since this will not be the main thread.
_QUEUE: t.Optional[janus.Queue] = None
class DBHandler(logging.Handler):
def __init__(self, *args, **kwargs):
# This is set from the flusher thread
global _QUEUE
while _QUEUE is None:
time.sleep(0.01)
self.q = _QUEUE.sync_q
super().__init__(*args, **kwargs)
def emit(self, record: logging.LogRecord):
# Convert log record to row, and send to queue for insertion
row = make_row(record)
self.q.put(row)
def make_row(record: logging.LogRecord) -> "Row":
# Use more columns than this
return {"msg": record.message}
def flusher():
async def run():
global _QUEUE
if _QUEUE is None:
_QUEUE = janus.Queue()
# Upload row instead of print. Perhaps flush every n-seconds w/ buffer
# to have an upper-bound on inserts.
q = _QUEUE.async_q
while True:
row = await q.get()
print("woohoo, recived:", row["msg"])
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run())
threading.Thread(target=flusher, daemon=True).start()
LOG.setLevel(logging.INFO)
LOG.addHandler(DBHandler())
LOG.info("starting program")
LOG.info("doing some stuff")
LOG.info("mighty cool")
# woohoo, recived: starting program
# woohoo, recived: doing some stuff
# woohoo, recived: mighty cool
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment