Skip to content

Instantly share code, notes, and snippets.

@amcgregor
Last active August 29, 2015 14:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amcgregor/52a684854fb77b6e7395 to your computer and use it in GitHub Desktop.
Save amcgregor/52a684854fb77b6e7395 to your computer and use it in GitHub Desktop.
An operational RPC worker for MongoDB.
#!/usr/bin/env python
# coding: utf-8
"""MongoDB queue and RPC mechanism for remote deferred execution.
To get started with this, create a virtualenv:
virtualenv --python=python2.7 q
cd q
source bin/activate
pip install pymongo pytz futures 'apscheduler<3.0'
Download this script to that folder. Then, open a second shell, repeat the
"cd" and "source" lines above, and in the first run:
python
from worker import consumer
consumer()
In the second terminal run:
python
from worker import producer
producer()
Watch the output of the first terminal, and check the time output on the second
at the end. >:D
"""
# === Imports ===
from __future__ import unicode_literals
from __future__ import print_function
import os
import time
import socket
import pymongo # pip install pymongo
import pytz # pip install pytz
from marrow.package.canonical import name
from marrow.package.loader import load
from datetime import datetime
from functools import partial
from concurrent import futures # pip install futures (if on Python < 3)
from apscheduler.scheduler import Scheduler # pip install 'apscheduler<3.0'
# === Globals ===
base_worker_query = {'result': {'$exists': False}}
# see: http://api.mongodb.org/python/current/tutorial.html
client = pymongo.MongoClient() # assumes localhost, no authentication
db = client.test
# === Basic Capped Collection Tailer ===
def queue(collection_name, query=None):
"""Capped collection tail utility."""
collections = db.collection_names()
if collection_name not in collections:
db.create_collection(collection_name)
if collection_name + '_' not in collections:
db.create_collection(collection_name + '_', capped=True,
size=50 * 2 ** 20)
waker = db[collection_name + '_']
if not waker.count():
# This is to prevent a terrible infinite busy loop while empty.
waker.insert(dict(nop=True))
last = None
query = query or {}
# Perform the initial query.
cursor = waker.find(query, sort=[('$natural', 1)], slave_ok=True,
tailable=True, await_data=True)
# Primary retry loop.
# If the cursor dies, we need to be able to restart it.
while True:
try:
# Inner record loop.
# We may reach the end of the for loop (timeout waiting for
# new records) at which point we should re-try the for loop as
# long as the cursor is still alive. If it isn't, re-query.
while cursor.alive:
for record in cursor:
# Update the last record ID for later re-querying.
last = record['_id']
yield record
except OperationFailure:
pass
# Re-create the cursor, continuing from where we left off.
retry_query = {"_id": {"$gte": last}}
retry_query.update(query)
cursor = waker.find(retry_query, sort=[('$natural', 1)],
slave_ok=True, tailable=True, await_data=True)
# === Exceptions ===
class AcquireFailed(Exception):
"""Unable to lock a task for completion by this worker.
"""
pass
# === Mock Futures Object ===
class MockFuture(object):
"""This pretends to be a Future instance for use in Future-like callbacks.
Includes `mock` classmethod to perform the Future callback steps.
"""
def __init__(self, result=None, exc=None):
self.exc = exc
self.res = result
def exception(self):
return self.exc
def result(self):
if self.exc:
raise self.exc
return self.res
def cancelled(self):
return False
@classmethod
def mock(cls, worker, _id):
try:
result = worker.handler(_id)
except Exception, e:
worker.callback(_id, cls(exc=e))
return
worker.callback(_id, cls(result))
# === Primary Worker Class ===
class Worker(object):
"""The producer and consumer sides of the MongoDB RPC mechanism.
The job record may be in one of the following states:
* new -- just created, nothing notified yet (scheduled only)
* pending -- ready to be executed / workers notified
* dead -- unable to notify workers
* cancelled -- task has been cancelled (scheduled only)
* running -- a worker is currently processing this task
* finished -- this task has been completed
"""
def __init__(self, collection_name, master=False, development=False):
"""Prepare a worker for use.
Specifically, this calculates the current identity, creates the
collections if missing, seeds the capped collection if needed, and
starts up the job scheduler and thread pool on the consumer-side.
"""
self.identity = (socket.gethostbyname(socket.gethostname()),
os.getppid(), os.getpid())
self.connected = False
self.db = None
self.collection_name = collection_name
self.master = master
self.development = development
if development or not master:
self.executor = futures.ThreadPoolExecutor(max_workers=10)
self.scheduler = Scheduler()
self.scheduler.start()
def _connect(self):
self.jobs = db[self.collection_name]
self.waker = db[self.collection_name + '_']
self.connected = True
def submit(self, obj, *args, **kw):
"""Submit a job for immediate background execution.
Arguments are the callable to execute (any callable other than a bound
method), positional, and keyword arguments.
This method creates a job record in the primary collection (containing
our identity, the callable, args/kwargs, a timestamp, and state of
"pending") then inserts the job ID into the waker queue / capped
collection.
"""
if not self.connected:
self._connect()
obj = name(obj)
# Submit the job to the job collection.
job_id = self.jobs.insert(dict(
creator=self.identity,
callable=obj,
args=args,
kwargs=kw,
created=datetime.utcnow(),
state="pending",
owner=None
), safe=True)
# Notify workers of the new job.
try:
self.waker.insert(dict(
creator=self.identity,
job_id=job_id
), safe=True)
except:
self.jobs.update({'_id': job_id}, {'$set': dict(state="dead")})
raise
if self.master and self.development:
self._call_immediate(job_id)
def at(self, dt, obj, *args, **kw):
"""Submit a job for execution at a specific date/time.
The first argument is a datetime instance, the remainder are the same
as the submit method.
"""
if not self.connected:
self._connect()
obj = name(obj)
# Submit the job to the job collection.
job_id = self.jobs.insert(dict(
creator=self.identity,
callable=obj,
args=args,
kwargs=kw,
created=datetime.utcnow(),
state="new",
owner=None,
when=dt
), safe=True)
# Notify workers of the new job.
try:
self.waker.insert(dict(
creator=self.identity,
job_id=job_id,
when=dt
), safe=True)
except:
self.jobs.update({'_id': job_id}, {'$set': dict(state="dead")})
raise
else:
self.jobs.update({'_id': job_id}, {'$set': dict(state="pending")})
if self.master and self.development:
self._call_scheduled(job_id, dt)
return job_id
def reschedule(self, id_, dt):
if not self.connected:
self._connect()
original = self.jobs.find_one({'_id': id_})
if not original:
raise Exception("Original job not found: " + str(id_))
if original['state'] != 'pending':
raise Exception("Can not reschedule task in state: " +
original['state'])
# Cancel the original job.
self.jobs.update({'_id': job_id}, {'$set': dict(state="cancelled")})
# Submit the job to the job collection again.
job_id = self.jobs.insert(dict(
creator=self.identity,
callable=original['callable'],
args=original['args'],
kwargs=original['kw'],
created=datetime.utcnow(),
state="new",
owner=None,
when=dt
), safe=True)
# Notify workers of the new job.
try:
self.waker.insert(dict(
creator=self.identity,
job_id=job_id,
when=dt
), safe=True)
except:
self.jobs.update({'_id': job_id}, {'$set': dict(state="dead")})
raise
else:
self.jobs.update({'_id': job_id}, {'$set': dict(state="pending")})
if self.master and self.development:
self._call_scheduled(job_id, dt)
return job_id
def _call_immediate(self, id_):
# Prepare a job handler for near-immediate execution.
f = self.executor.submit(self.handler, id_)
# Register our callback so we can correctly reply to
# the job upon completion.
f.add_done_callback(partial(self.callback, id_))
def _call_scheduled(self, id_, when):
# Convert UTC to naive local time for scheduling.
utc = pytz.UTC.localize(when)
native = pytz.timezone('Canada/Eastern')
# Schedule the job.
self.scheduler.add_date_job(MockFuture.mock,
utc.astimezone(native).replace(tzinfo=None),
[self, id_])
def __call__(self):
"""Primary worker, usually called within a process + thread pool.
"""
if not self.connected:
self._connect()
# We skip responses.
for record in queue(self.collection_name, base_worker_query):
# Skip no-ops. We don't exclude them from the query, above,
# because for cursor tailing to work we need to see at least one
# record.
if 'nop' in record:
continue
# Handle immeidate RPC jobs.
if 'when' not in record:
self._call_immediate(record['job_id'])
continue
# Handle scheduled jobs.
# Skip the job if it is stale.
if record['when'] < datetime.utcnow():
continue
self._call_scheduled(record['job_id'], record['when'])
def handler(self, job_id):
"""Job execution handler.
This accepts the job (if possible) and handles saving the reply back
to the database.
"""
# Build the dictionary update.
update = dict(
acquired=datetime.utcnow(),
state="running",
owner=self.identity
)
# Attempt to acquire the job.
try:
# This will fail in one of two ways, handled below, if we can't
# acquire the lock in time.
result = self.jobs.update(
dict(_id=job_id, state="pending", owner=None),
{"$set": update},
safe=True
)
except:
# There was an actual error attempting to update the job.
raise AcquireFailed()
# If we didn't manage to update any records, someone else has acquired
# the lock before us.
if not result['updatedExisting']:
raise AcquireFailed()
try:
# Load the parts of the job record we need to execute it.
job = self.jobs.find(dict(_id=job_id), limit=1,
fields=['callable', 'args', 'kwargs'])[0]
except:
# This should, in theory, never happen unless MongoDB goes away.
raise AcquireFailed()
# Load the correct callable object. Marrow Package does this for us.
obj = load(job['callable'])
# Since args and kwargs may be missing
args = job.get('args', [])
kwargs = job.get('kwargs', {})
return obj(*args, **kwargs)
def callback(self, job_id, future):
"""Executed upon completion of a job.
"""
exc = None
# If the exception is AcquireFailed, we silently ignore the result.
try:
result = future.result()
except AcquireFailed:
return
except:
exc = exc
try:
# Update the job record to include the results and new state.
result = self.jobs.update(
dict(_id=job_id, state="running", owner=self.identity),
{"$set": dict(
state="finished",
completed=datetime.utcnow(),
cancelled=future.cancelled(),
result=result,
exception=exc
)},
safe=True
)
if not result['updatedExisting']:
raise Exception()
except:
# If we failed, someone else competed the job or the state has
# changed!
return
# Send notification of completion.
self.waker.insert(dict(creator=self.identity, job_id=job_id,
result=True))
def consumer():
"""In one terminal open a python shell and run:
>>> from worker import consumer
>>> consumer() # Will block here. Clear your scrollback buffer and prepare. ;)
"""
worker = Worker('task', False, False)
worker() # Blocks.
def hello(name):
"""Simple function to RPC."""
print("Hello, ", name, "!", sep='')
def producer():
"""In another terminal open a python shell and run:
>>> from worker import producer
>>> producer() # Will block until 1000 tasks are added.
"""
worker = Worker('task', True, False)
pid = os.getpid() * 10000
start = time.time()
for i in xrange(1000):
worker.submit(hello, pid + i)
duration = time.time() - start
print("1000 tasks submitted in", duration, "seconds.", duration / 1000, "seconds per task.")
if __name__ == '__main__':
# This script is executable. chmod +x it and you can run ./worker.py to start a worker.
consumer()
@amcgregor
Copy link
Author

For the presentation slides, see: https://gist.github.com/amcgregor/4207375

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment