Skip to content

Instantly share code, notes, and snippets.

@masroore
Created August 5, 2014 05:46
Show Gist options
  • Save masroore/ef9eaa3c5794dfa5f4b2 to your computer and use it in GitHub Desktop.
Save masroore/ef9eaa3c5794dfa5f4b2 to your computer and use it in GitHub Desktop.
class DocumentProcessor(object):
WORKER_TASK_SIZE = 100#00
SAVER_CHUNK_LIMIT = 1000
WORKER_POOL_SIZE = 10
def __init__(self, result_file):
self.result_file = result_file
self.id_cache = []
self.task_queue = Queue()
self.result_queue = Queue()
self.spawn_workers()
self.documents_loaded = 0
def worker(self, name, task_queue, result_queue):
while True:
task = self.task_queue.get()
if task is None:
break
else:
for doc in db.query.find({'_id': {'$in': task}}, {'q': 1}):
self.result_queue.put(doc['q'])
def saver(self, result_queue, result_file):
chunk = []
while True:
result = self.result_queue.get()
if result is None:
break
else:
self.documents_loaded += 1
chunk.append(result)
if len(chunk) >= self.SAVER_CHUNK_LIMIT:
self.result_file.write('\n'.join(chunk) + '\n')
chunk = []
if chunk:
self.result_file.write('\n'.join(chunk) + '\n')
chunk = []
def spawn_workers(self):
self.worker_pool = []
for x in xrange(self.WORKER_POOL_SIZE):
name = 'worker-%d' % x
thread = Thread(target=self.worker, args=[name, self.task_queue, self.result_queue])
self.worker_pool.append(thread)
thread.start()
self.saver_thread = Thread(target=self.saver, args=[self.result_queue, self.result_file])
self.saver_thread.start()
def add_doc_id(self, id_):
self.id_cache.append(id_)
if len(self.id_cache) >= self.WORKER_TASK_SIZE:
self.task_queue.put(self.id_cache)
self.id_cache = []
def shutdown(self):
if self.id_cache:
self.task_queue.put(self.id_cache)
self.id_cache = []
for x in xrange(len(self.worker_pool)):
self.task_queue.put(None)
for worker in self.worker_pool:
worker.join()
self.result_queue.put(None)
self.saver_thread.join()
self.result_file.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment