Skip to content

Instantly share code, notes, and snippets.

@bergundy
Created November 14, 2012 01:05
Show Gist options
  • Save bergundy/4069540 to your computer and use it in GitHub Desktop.
Save bergundy/4069540 to your computer and use it in GitHub Desktop.
import motor
import toro
import time
import logging
from tornado import ioloop, gen
def sleep(seconds, io_loop = None):
io_loop = io_loop or ioloop.IOLoop.instance()
return gen.Task(io_loop.add_timeout, time.time() + seconds)
class AsyncWriterMixin(object):
def initialize_writer(self, sync_delay = 5, io_loop = None):
self.io_loop = io_loop or ioloop.IOLoop.instance()
self.sync_delay = sync_delay
self._queue_suspended = False
self._reset_queue()
self._start_writer()
def enqueue(self, *args, **kwargs):
if not self.queue_suspended:
self._queue.put((args, kwargs))
def _reset_queue(self):
self._queue = toro.Queue()
@property
def queue_suspended(self):
return self._queue_suspended
@gen.engine
def _start_writer(self):
while True:
args, kwargs = yield gen.Task(self._queue.get)
try:
yield motor.Op(*args, **kwargs)
except:
logging.exception('Exception caught, out of sync while trying to run (%r, %r)', args, kwargs)
self._queue_suspended = True
self._reset_queue()
yield sleep(self.sync_delay, io_loop = self.io_loop)
self._queue_suspended = False
self.enqueue(self.sync)
import mock
from tornado.testing import AsyncTestCase
from async_writer_mixin import AsyncWriterMixin
class MongoQueue(AsyncWriterMixin):
def __init__(self, *args, **kwargs):
self.initialize_writer(*args, **kwargs)
self.sync = mock.Mock()
class AsyncWriterMixinTest(AsyncTestCase):
def test_initialize_writer(self):
q = MongoQueue(io_loop = self.io_loop, sync_delay = 0.001)
self.assertEqual(q.io_loop, self.io_loop)
self.assertEqual(q.sync_delay, 0.001)
self.assertFalse(q.queue_suspended)
def test_initialize_writer_defaults(self):
q = MongoQueue(io_loop = self.io_loop)
self.assertEqual(q.io_loop, self.io_loop)
self.assertEqual(q.sync_delay, 5)
self.assertFalse(q.queue_suspended)
def return_result(self, callback):
callback('ok', None)
self.stop()
def raise_exception(self, callback):
callback(None, Exception('xxx'))
self.stop()
def test_enqueue(self):
q = MongoQueue(io_loop = self.io_loop, sync_delay = 0.001)
q.enqueue(self.return_result)
self.wait() # wait for task to be called
self.assertFalse(q.queue_suspended)
def test_queue_suspended(self):
q = MongoQueue(io_loop = self.io_loop, sync_delay = 0.001)
should_be_scrapped = mock.Mock(side_effect = self.return_result)
q.enqueue(self.raise_exception)
q.enqueue(should_be_scrapped) # due to queue rest
self.wait() # wait for task to be called
self.assertTrue(q.queue_suspended)
q.enqueue(should_be_scrapped) # due to queue_suspended
q.sync.side_effect = self.return_result
self.wait() # wait for sync to be called
self.assertEqual(len(should_be_scrapped.mock_calls), 0)
self.assertEqual(len(q.sync.mock_calls), 1)
self.assertFalse(q.queue_suspended)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment