Skip to content

Instantly share code, notes, and snippets.

@amcgregor
Created December 4, 2012 18:46
Show Gist options
  • Save amcgregor/4207375 to your computer and use it in GitHub Desktop.
Save amcgregor/4207375 to your computer and use it in GitHub Desktop.
An overview of a light-weight, high-performance scalable deferred task (RPC) system.

MongoDB as RPC

  • Needed for timed game events.
  • Needed because Facebook’s API is slow.
  • Background task execution.
  • Immediate or scheduled.
  • Captures responses and exceptions.
  • Acts as a state machine with atomic locking.
    • Uses the “update if current” mechanism.
  • Stores job data in a MongoDB collection.
    • Easy to scale. (Sharding / replication.)
  • Push notifications via capped collection.
    • The same method MongoDB uses internally for replication.
  • Uses Futures for parallel execution of tasks.
  • Uses APScheduler for timed execution of tasks.

MongoDB as a low-Latency Queue

  • Limited size, optionally limited document count using capped collections.
  • Ring buffer design. Insert order. Updates allowed if resulting data is same or smaller size.
  • Used by MongoDB internally for multi-server replication.
  • Tailable cursors. Long-poll push, like IMAP IDLE.
    • Workers are immediately handed the newly inserted record.
{
"_id" : ObjectId("4ea3717f9bfbb601d2000002"),
"state" : "new", // pending, dead, cancelled, running, finished
"callable" : "c__builtin__\nprint\np1\n.",
"args" : [ "Task", 0 ],
"kwargs" : { },
"created" : ISODate("2011-10-23T01:44:31.446Z"),
"creator" : [ "Lucifer", 298, 466 ],
"owner" : null, // [ "Lucifer", 324, 456 ]
// If scheduled, not immediate:
"when": ISODate("...")
// If in progress or completed...
"acquired" : ISODate("..."),
// If completed...
"result" : null,
"exception" : null,
"completed" : ISODate("..."),
}
// Workaround for MongoDB quirk.
{ "_id" : ObjectId("4ea371629bfbb601c8000000"), "nop" : true }
{ // New job.
"_id" : ObjectId("4ea371769bfbb601d2000001"),
"job_id" : ObjectId("4ea371769bfbb601d2000000"),
"creator" : [ "Lucifer", 298, 466 ]
}
{ // Finished job.
"_id" : ObjectId("4ea371769bfbb601c8000001"),
"job_id" : ObjectId("4ea371769bfbb601d2000000"),
"creator" : [ "Lucifer", 324, 456 ],
"result" : true
}
def queue(collection, query=None):
if not collection.find():
# This is to prevent a terrible infinite busy loop while empty.
collection.insert(dict(nop=True))
last = None
query = query or {}
cursor = collection.find(query, slave_ok=True, tailable=True, await_data=True)
while True: # Primary retry loop.
try:
while cursor.alive: # Inner record loop; may time out.
for record in cursor:
last = record['_id']
yield record
except OperationFailure:
pass
retry_query = {"_id": {"$gte": last}}
retry_query.update(query)
cursor = collection.find(retry_query, slave_ok=True, tailable=True, await_data=True)
def handler(self, job_id):
# Build the dictionary update.
update = dict(acquired=datetime.utcnow(), state="running",
owner=self.identity)
try:
result = self.jobs.update(dict(_id=job_id, state="pending", owner=None),
{"$set": update}, safe=True)
except:
raise AcquireFailed()
if not result['updatedExisting']: raise AcquireFailed()
try:
job = self.jobs.find(dict(_id=job_id), limit=1,
fields=['callable', 'args', 'kwargs'])[0]
except: # This should, in theory, never happen unless MongoDB goes away.
raise AcquireFailed()
obj = pickle.loads(job['callable'].encode('ascii'))
args = job.get('args', [])
kwargs = job.get('kwargs', {})
return obj(*args, **kwargs)
@amcgregor
Copy link
Author

@amcgregor
Copy link
Author

Have an actually working implementation: https://gist.github.com/amcgregor/52a684854fb77b6e7395

@amcgregor
Copy link
Author

Have the implementation modernized and packaged up: https://github.com/marrow/task

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment