Skip to content

Instantly share code, notes, and snippets.

@mayli
Created October 13, 2018 03:46
Show Gist options
  • Save mayli/2958817789e59d6ce34f9aa71a3cc43a to your computer and use it in GitHub Desktop.
Save mayli/2958817789e59d6ce34f9aa71a3cc43a to your computer and use it in GitHub Desktop.
import threading
import Queue
import shelve
from multiprocessing.pool import ThreadPool
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:[%(threadName)s] %('
'levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class Pipeline(object):
state_lock = threading.Lock()
def __init__(self, tasks, processes=1, maxsize=1, state="o.state"):
n = len(tasks)
self.tasks = tasks
self.pools = [ThreadPool(processes) for _ in range(n)]
self.out = Queue.Queue()
if state:
self.state = shelve.open(state)
def resume(self):
with self.state_lock:
for key, items in self.state.items():
for item in items:
self.execute(self.tasks[int(key)], item)
def state_log(self, i, item):
key = str(i)
with self.state_lock:
items = self.state.get(key, [])
self.state[key] = items + [item]
def state_done(self, i, item):
key = str(i)
with self.state_lock:
items = self.state.get(key, [])
items.remove(item)
self.state[key] = items
def run(self, items, wait=True):
map(lambda item: self.execute(0, item), items)
if wait:
self.wait()
return self.out
def wait(self):
for i, pool in enumerate(self.pools):
print "waiting for pool", i, pool
pool.close()
pool.join()
def execute(self, i, item):
task = self.tasks[i]
if i == len(self.tasks) - 1:
# last, put result to out
callback = lambda item_: (self.state_done(i, item), self.out.put(item_))
else:
callback = lambda item_: (self.state_done(i, item), self.execute(i + 1,
item_))
pool = self.pools[i]
self.state_log(i, item)
pool.apply_async(task, (item,), callback=callback)
import time
def sleep(item):
logger.debug("processing %s", item)
time.sleep(1)
return item + "," + str(time.time())
def test():
items = "abcdefg"
pipe = Pipeline([sleep, sleep, sleep, sleep])
out = pipe.run(items)
print out.queue
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment