Skip to content

Instantly share code, notes, and snippets.

@linar-jether
Created June 22, 2017 13:36
Show Gist options
  • Save linar-jether/19a6598a2334e4e8eec2e920e9bccb8c to your computer and use it in GitHub Desktop.
Save linar-jether/19a6598a2334e4e8eec2e920e9bccb8c to your computer and use it in GitHub Desktop.
Celery task monitor, logs task state to MongoDB
import pickle
import threading
from Queue import Queue
import time
from bson import InvalidDocument
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
from jether_cloud import task_queue
from celery.result import AsyncResult
from JERDB import JERDB
import arrow
import pandas as pd
# Celery Task monitor, stores all received task states to a capped mongodb collection
class MonitorThread(object):
def __init__(self, celery_app, interval=1):
self.celery_app = celery_app
self.interval = interval
# Change max_tasks_in_memory to handle long running tasks (Allows to correlate events by task id),
# if the task state has been evicted from memory, new task events (for the same task-id) will only include the id and event specific info
self.state = self.celery_app.events.State(max_tasks_in_memory=50000)
self.queue = Queue()
# Insert log events in the background, to
self.thread = threading.Thread(target=self.insert_records, args=())
self.thread.daemon = True
self.thread.start()
self.db = JERDB.db('JER_DB')
def catchall(self, event):
if event['type'] != 'worker-heartbeat':
if 'uuid' in event:
self.state.event(event)
update_event = self.state.tasks[event['uuid']].as_dict()
if '...' in str(update_event.get('result', '')) and AsyncResult(event['uuid']).ready():
update_event['result'] = AsyncResult(event['uuid']).get(10, propagate=False)
pass
elif update_event.get('exception', '') and AsyncResult(event['uuid']).ready():
try:
update_event['exception_binary'] = pickle.dumps(AsyncResult(event['uuid']).get(1, propagate=False))
except Exception, e:
# Timeout while getting exception_binary
import traceback
traceback.print_exc()
logger.exception("Timeout while getting exception_binary")
pass
update_event['worker'] = update_event['worker'].hostname
update_event = {key: val for key, val in update_event.iteritems() if isinstance(val, pd.core.base.PandasObject) or val}
for time_field in ['received', 'timestamp', 'started', 'succeeded']:
if update_event.get(time_field):
update_event[time_field] = arrow.get(update_event[time_field]).datetime
#print update_event
self.queue.put(update_event)
# logic here
def insert_records(self):
while True:
try:
try:
bulk = [self.queue.get(),]
while not self.queue.empty():
bulk.append(self.queue.get_nowait())
logger.info('inserting %d records' % len(bulk))
self.db.get_collection('Celery_log').insert_many(bulk, ordered=False)
# In case we got an invalid document try to insert them individually and replace objects with repr
except InvalidDocument, e:
for doc in bulk:
doc.pop("_id", None)
try:
self.db.get_collection('Celery_log').insert_one(doc)
except:
try:
doc['result'] = repr(doc['result'])
self.db.get_collection('Celery_log').insert_one(doc)
except Exception, e:
logger.exception("Insert to mongo failed")
except Exception, e:
logger.exception("Insert to mongo failed")
[self.queue.put_nowait(doc) for doc in bulk]
finally:
time.sleep(60)
def run(self):
while True:
try:
with self.celery_app.connection() as connection:
recv = self.celery_app.events.Receiver(connection, handlers={
'*': self.catchall
})
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
try:
import _thread as thread
except ImportError:
import thread
thread.interrupt_main()
except Exception as e:
import traceback
traceback.print_exc()
logger.error("Failed to capture events: '%s', "
"trying again in %s seconds.",
e, self.interval)
logger.debug(e, exc_info=True)
time.sleep(self.interval)
def get_celery_app():
return task_queue.app
if __name__ == '__main__':
import logging
FORMAT = "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s"
logging.basicConfig(format=FORMAT,level=logging.INFO,datefmt='%Y-%m-%d %H:%M:%S')
app = get_celery_app() # returns app
MonitorThread(app).run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment