Skip to content

Instantly share code, notes, and snippets.

@meredian
Last active December 16, 2016 10:52
Show Gist options
  • Save meredian/98ef915d43d398eb174b91b4aae35457 to your computer and use it in GitHub Desktop.
Save meredian/98ef915d43d398eb174b91b4aae35457 to your computer and use it in GitHub Desktop.
class AsyncMapState:
def __init__(self, func, items, **kwargs):
self.pool = Pool(kwargs.pop('procs', 6))
self.async_result = self.pool.map_async(func, items, 1, self.stop)
self.pool.close()
self.start_time = time.time()
self.size = len(items)
def item_status(self):
time_spent = time.time() - self.start_time
time_spent = time.time() - self.start_time
items_left = self.async_result._number_left
items_processed = self.size - items_left
time_left = time_spent * items_left / max(items_processed, 1)
return (items_processed, time_spent, items_left, time_left)
def stop(self, result):
self.stop_time = time.time()
def report(self):
if (self.async_result.ready()):
print "Result is ready, use get() to check it"
return True
else:
(items_done, time_done, items_left, time_left) = self.item_status()
print "Already {0:} items ({1:.2f}%) processed for {2:.1f} sec".format(
items_done, 100.0 * items_done / self.size, time_done
)
print "Still calculating, {0:} items left, est. time {1:.1f} sec".format(
items_left, time_left
)
return False
def get(self):
if (self.async_result.ready()):
res = self.async_result.get()
print "Job ready, {0:} items processed for {1:.1f} sec".format(self.size, self.stop_time - self.start_time)
return res
else:
print "Job is still running, use report() to check state"
def sleep_and_square(x):
time.sleep(random.randint(0, x));
return x * x
test_async = AsyncMapState(sleep_and_square, range(10))
while not test_async.report():
time.sleep(2)
test_async.get()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment