Last active
August 29, 2015 14:07
-
-
Save amcgregor/52a684854fb77b6e7395 to your computer and use it in GitHub Desktop.
An operational RPC worker for MongoDB.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
"""MongoDB queue and RPC mechanism for remote deferred execution. | |
To get started with this, create a virtualenv: | |
virtualenv --python=python2.7 q | |
cd q | |
source bin/activate | |
pip install pymongo pytz futures 'apscheduler<3.0' | |
Download this script to that folder. Then, open a second shell, repeat the | |
"cd" and "source" lines above, and in the first run: | |
python | |
from worker import consumer | |
consumer() | |
In the second terminal run: | |
python | |
from worker import producer | |
producer() | |
Watch the output of the first terminal, and check the time output on the second | |
at the end. >:D | |
""" | |
# === Imports === | |
from __future__ import unicode_literals | |
from __future__ import print_function | |
import os | |
import time | |
import socket | |
import pymongo # pip install pymongo | |
import pytz # pip install pytz | |
from marrow.package.canonical import name | |
from marrow.package.loader import load | |
from datetime import datetime | |
from functools import partial | |
from concurrent import futures # pip install futures (if on Python < 3) | |
from apscheduler.scheduler import Scheduler # pip install 'apscheduler<3.0' | |
# === Globals === | |
base_worker_query = {'result': {'$exists': False}} | |
# see: http://api.mongodb.org/python/current/tutorial.html | |
client = pymongo.MongoClient() # assumes localhost, no authentication | |
db = client.test | |
# === Basic Capped Collection Tailer === | |
def queue(collection_name, query=None): | |
"""Capped collection tail utility.""" | |
collections = db.collection_names() | |
if collection_name not in collections: | |
db.create_collection(collection_name) | |
if collection_name + '_' not in collections: | |
db.create_collection(collection_name + '_', capped=True, | |
size=50 * 2 ** 20) | |
waker = db[collection_name + '_'] | |
if not waker.count(): | |
# This is to prevent a terrible infinite busy loop while empty. | |
waker.insert(dict(nop=True)) | |
last = None | |
query = query or {} | |
# Perform the initial query. | |
cursor = waker.find(query, sort=[('$natural', 1)], slave_ok=True, | |
tailable=True, await_data=True) | |
# Primary retry loop. | |
# If the cursor dies, we need to be able to restart it. | |
while True: | |
try: | |
# Inner record loop. | |
# We may reach the end of the for loop (timeout waiting for | |
# new records) at which point we should re-try the for loop as | |
# long as the cursor is still alive. If it isn't, re-query. | |
while cursor.alive: | |
for record in cursor: | |
# Update the last record ID for later re-querying. | |
last = record['_id'] | |
yield record | |
except OperationFailure: | |
pass | |
# Re-create the cursor, continuing from where we left off. | |
retry_query = {"_id": {"$gte": last}} | |
retry_query.update(query) | |
cursor = waker.find(retry_query, sort=[('$natural', 1)], | |
slave_ok=True, tailable=True, await_data=True) | |
# === Exceptions === | |
class AcquireFailed(Exception): | |
"""Unable to lock a task for completion by this worker. | |
""" | |
pass | |
# === Mock Futures Object === | |
class MockFuture(object): | |
"""This pretends to be a Future instance for use in Future-like callbacks. | |
Includes `mock` classmethod to perform the Future callback steps. | |
""" | |
def __init__(self, result=None, exc=None): | |
self.exc = exc | |
self.res = result | |
def exception(self): | |
return self.exc | |
def result(self): | |
if self.exc: | |
raise self.exc | |
return self.res | |
def cancelled(self): | |
return False | |
@classmethod | |
def mock(cls, worker, _id): | |
try: | |
result = worker.handler(_id) | |
except Exception, e: | |
worker.callback(_id, cls(exc=e)) | |
return | |
worker.callback(_id, cls(result)) | |
# === Primary Worker Class === | |
class Worker(object): | |
"""The producer and consumer sides of the MongoDB RPC mechanism. | |
The job record may be in one of the following states: | |
* new -- just created, nothing notified yet (scheduled only) | |
* pending -- ready to be executed / workers notified | |
* dead -- unable to notify workers | |
* cancelled -- task has been cancelled (scheduled only) | |
* running -- a worker is currently processing this task | |
* finished -- this task has been completed | |
""" | |
def __init__(self, collection_name, master=False, development=False): | |
"""Prepare a worker for use. | |
Specifically, this calculates the current identity, creates the | |
collections if missing, seeds the capped collection if needed, and | |
starts up the job scheduler and thread pool on the consumer-side. | |
""" | |
self.identity = (socket.gethostbyname(socket.gethostname()), | |
os.getppid(), os.getpid()) | |
self.connected = False | |
self.db = None | |
self.collection_name = collection_name | |
self.master = master | |
self.development = development | |
if development or not master: | |
self.executor = futures.ThreadPoolExecutor(max_workers=10) | |
self.scheduler = Scheduler() | |
self.scheduler.start() | |
def _connect(self): | |
self.jobs = db[self.collection_name] | |
self.waker = db[self.collection_name + '_'] | |
self.connected = True | |
def submit(self, obj, *args, **kw): | |
"""Submit a job for immediate background execution. | |
Arguments are the callable to execute (any callable other than a bound | |
method), positional, and keyword arguments. | |
This method creates a job record in the primary collection (containing | |
our identity, the callable, args/kwargs, a timestamp, and state of | |
"pending") then inserts the job ID into the waker queue / capped | |
collection. | |
""" | |
if not self.connected: | |
self._connect() | |
obj = name(obj) | |
# Submit the job to the job collection. | |
job_id = self.jobs.insert(dict( | |
creator=self.identity, | |
callable=obj, | |
args=args, | |
kwargs=kw, | |
created=datetime.utcnow(), | |
state="pending", | |
owner=None | |
), safe=True) | |
# Notify workers of the new job. | |
try: | |
self.waker.insert(dict( | |
creator=self.identity, | |
job_id=job_id | |
), safe=True) | |
except: | |
self.jobs.update({'_id': job_id}, {'$set': dict(state="dead")}) | |
raise | |
if self.master and self.development: | |
self._call_immediate(job_id) | |
def at(self, dt, obj, *args, **kw): | |
"""Submit a job for execution at a specific date/time. | |
The first argument is a datetime instance, the remainder are the same | |
as the submit method. | |
""" | |
if not self.connected: | |
self._connect() | |
obj = name(obj) | |
# Submit the job to the job collection. | |
job_id = self.jobs.insert(dict( | |
creator=self.identity, | |
callable=obj, | |
args=args, | |
kwargs=kw, | |
created=datetime.utcnow(), | |
state="new", | |
owner=None, | |
when=dt | |
), safe=True) | |
# Notify workers of the new job. | |
try: | |
self.waker.insert(dict( | |
creator=self.identity, | |
job_id=job_id, | |
when=dt | |
), safe=True) | |
except: | |
self.jobs.update({'_id': job_id}, {'$set': dict(state="dead")}) | |
raise | |
else: | |
self.jobs.update({'_id': job_id}, {'$set': dict(state="pending")}) | |
if self.master and self.development: | |
self._call_scheduled(job_id, dt) | |
return job_id | |
def reschedule(self, id_, dt): | |
if not self.connected: | |
self._connect() | |
original = self.jobs.find_one({'_id': id_}) | |
if not original: | |
raise Exception("Original job not found: " + str(id_)) | |
if original['state'] != 'pending': | |
raise Exception("Can not reschedule task in state: " + | |
original['state']) | |
# Cancel the original job. | |
self.jobs.update({'_id': job_id}, {'$set': dict(state="cancelled")}) | |
# Submit the job to the job collection again. | |
job_id = self.jobs.insert(dict( | |
creator=self.identity, | |
callable=original['callable'], | |
args=original['args'], | |
kwargs=original['kw'], | |
created=datetime.utcnow(), | |
state="new", | |
owner=None, | |
when=dt | |
), safe=True) | |
# Notify workers of the new job. | |
try: | |
self.waker.insert(dict( | |
creator=self.identity, | |
job_id=job_id, | |
when=dt | |
), safe=True) | |
except: | |
self.jobs.update({'_id': job_id}, {'$set': dict(state="dead")}) | |
raise | |
else: | |
self.jobs.update({'_id': job_id}, {'$set': dict(state="pending")}) | |
if self.master and self.development: | |
self._call_scheduled(job_id, dt) | |
return job_id | |
def _call_immediate(self, id_): | |
# Prepare a job handler for near-immediate execution. | |
f = self.executor.submit(self.handler, id_) | |
# Register our callback so we can correctly reply to | |
# the job upon completion. | |
f.add_done_callback(partial(self.callback, id_)) | |
def _call_scheduled(self, id_, when): | |
# Convert UTC to naive local time for scheduling. | |
utc = pytz.UTC.localize(when) | |
native = pytz.timezone('Canada/Eastern') | |
# Schedule the job. | |
self.scheduler.add_date_job(MockFuture.mock, | |
utc.astimezone(native).replace(tzinfo=None), | |
[self, id_]) | |
def __call__(self): | |
"""Primary worker, usually called within a process + thread pool. | |
""" | |
if not self.connected: | |
self._connect() | |
# We skip responses. | |
for record in queue(self.collection_name, base_worker_query): | |
# Skip no-ops. We don't exclude them from the query, above, | |
# because for cursor tailing to work we need to see at least one | |
# record. | |
if 'nop' in record: | |
continue | |
# Handle immeidate RPC jobs. | |
if 'when' not in record: | |
self._call_immediate(record['job_id']) | |
continue | |
# Handle scheduled jobs. | |
# Skip the job if it is stale. | |
if record['when'] < datetime.utcnow(): | |
continue | |
self._call_scheduled(record['job_id'], record['when']) | |
def handler(self, job_id): | |
"""Job execution handler. | |
This accepts the job (if possible) and handles saving the reply back | |
to the database. | |
""" | |
# Build the dictionary update. | |
update = dict( | |
acquired=datetime.utcnow(), | |
state="running", | |
owner=self.identity | |
) | |
# Attempt to acquire the job. | |
try: | |
# This will fail in one of two ways, handled below, if we can't | |
# acquire the lock in time. | |
result = self.jobs.update( | |
dict(_id=job_id, state="pending", owner=None), | |
{"$set": update}, | |
safe=True | |
) | |
except: | |
# There was an actual error attempting to update the job. | |
raise AcquireFailed() | |
# If we didn't manage to update any records, someone else has acquired | |
# the lock before us. | |
if not result['updatedExisting']: | |
raise AcquireFailed() | |
try: | |
# Load the parts of the job record we need to execute it. | |
job = self.jobs.find(dict(_id=job_id), limit=1, | |
fields=['callable', 'args', 'kwargs'])[0] | |
except: | |
# This should, in theory, never happen unless MongoDB goes away. | |
raise AcquireFailed() | |
# Load the correct callable object. Marrow Package does this for us. | |
obj = load(job['callable']) | |
# Since args and kwargs may be missing | |
args = job.get('args', []) | |
kwargs = job.get('kwargs', {}) | |
return obj(*args, **kwargs) | |
def callback(self, job_id, future): | |
"""Executed upon completion of a job. | |
""" | |
exc = None | |
# If the exception is AcquireFailed, we silently ignore the result. | |
try: | |
result = future.result() | |
except AcquireFailed: | |
return | |
except: | |
exc = exc | |
try: | |
# Update the job record to include the results and new state. | |
result = self.jobs.update( | |
dict(_id=job_id, state="running", owner=self.identity), | |
{"$set": dict( | |
state="finished", | |
completed=datetime.utcnow(), | |
cancelled=future.cancelled(), | |
result=result, | |
exception=exc | |
)}, | |
safe=True | |
) | |
if not result['updatedExisting']: | |
raise Exception() | |
except: | |
# If we failed, someone else competed the job or the state has | |
# changed! | |
return | |
# Send notification of completion. | |
self.waker.insert(dict(creator=self.identity, job_id=job_id, | |
result=True)) | |
def consumer(): | |
"""In one terminal open a python shell and run: | |
>>> from worker import consumer | |
>>> consumer() # Will block here. Clear your scrollback buffer and prepare. ;) | |
""" | |
worker = Worker('task', False, False) | |
worker() # Blocks. | |
def hello(name): | |
"""Simple function to RPC.""" | |
print("Hello, ", name, "!", sep='') | |
def producer(): | |
"""In another terminal open a python shell and run: | |
>>> from worker import producer | |
>>> producer() # Will block until 1000 tasks are added. | |
""" | |
worker = Worker('task', True, False) | |
pid = os.getpid() * 10000 | |
start = time.time() | |
for i in xrange(1000): | |
worker.submit(hello, pid + i) | |
duration = time.time() - start | |
print("1000 tasks submitted in", duration, "seconds.", duration / 1000, "seconds per task.") | |
if __name__ == '__main__': | |
# This script is executable. chmod +x it and you can run ./worker.py to start a worker. | |
consumer() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For the presentation slides, see: https://gist.github.com/amcgregor/4207375