Skip to content

Instantly share code, notes, and snippets.

@tomotaka
Created September 4, 2013 08:32
Show Gist options
  • Save tomotaka/6434293 to your computer and use it in GitHub Desktop.
Save tomotaka/6434293 to your computer and use it in GitHub Desktop.
gevent greenlet thread pool
#!/usr/bin/python
# -*- coding: utf-8 -*-
import gevent
from gevent.event import AsyncResult
from gevent.queue import Queue
__all__ = ('GExecutorNotRunningError', 'GExecutor')
class GExecutorNotRunningError(Exception):
pass
class GExecutor(object):
def __init__(self, func, thread_num=10):
"""
Note:
- function 'func' will only take one parameter.
"""
self.thread_num = thread_num
self.threads = []
queue = Queue()
self.queue = queue
# thread: repeat wait=>process=>return
def computing_thread():
while True:
data, async_result = queue.get() # block: wait submit(data) called
if async_result is None:
break # async_result==None: termination signal
result = func(data)
async_result.set(result)
# start threads
for i in range(thread_num):
gthread = gevent.spawn(computing_thread)
self.threads.append(gthread)
self.is_running = True
def shutdown(self):
self.is_running = False
for i in range(self.thread_num):
self.queue.put((None, None)) # send termination signal to threads
gevent.joinall(self.threads)
def submit(self, data):
"""returns AsyncResult object.
---
def multiply(n):
return n * 3
executor = GExecutor(func=multiply, thread_num=10)
future = executor.submit(10)
result = future.get()
---
future.get() will block until the computation finishes."""
if not self.is_running:
raise GExecutorNotRunningError
async_result = AsyncResult()
self.queue.put((data, async_result))
return async_result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment