Skip to content

Instantly share code, notes, and snippets.

@rambo
Created January 20, 2019 10:10
Show Gist options
  • Save rambo/8f46faf25df25a79f2cb7cf7ae32f432 to your computer and use it in GitHub Desktop.
Save rambo/8f46faf25df25a79f2cb7cf7ae32f432 to your computer and use it in GitHub Desktop.
concurrent.futures example where futures can generate more work to do
from concurrent.futures import ProcessPoolExecutor
import time
import random
def long_task(sleeptime):
print("Sleeping {}".format(sleeptime))
time.sleep(sleeptime)
if sleeptime < 1:
return []
if random.random() > 0.7:
return [ sleeptime*0.5, sleeptime*2 ]
return [ sleeptime*0.5 ]
class DazMainThing:
def __init__(self):
self.executor = ProcessPoolExecutor()
self.running_futures = []
def future_done(self, fut):
self.running_futures.remove(fut)
res = fut.result()
print("future result {}".format(res))
for st in res:
self.add_long_task(st)
def add_long_task(self, sleeptime):
print("Adding new task")
fut = self.executor.submit(long_task, sleeptime)
fut.add_done_callback(self.future_done)
self.running_futures.append(fut)
def executor_shutdown(self):
print("Waiting for tasks to complete")
# This could also be done by just passing wait=True to the shutdown method but if you want to do something special while waiting
if self.running_futures:
while True:
all_done = True
for sf in self.running_futures:
if not sf.done():
all_done = False
break
if all_done:
break
self.executor.shutdown()
self.executor = None
self.running_futures = []
if __name__ == '__main__':
ins = DazMainThing()
ins.add_long_task(5)
# Wait for tasks to complete (in reality this would be mainloop of the real application)
while True:
time.sleep(1)
if not ins.running_futures:
break
# This will again make sure there are no leftover tasks
ins.executor_shutdown()
@rambo
Copy link
Author

rambo commented Jan 20, 2019

Forgot example output

rambombp2015-674:Downloads rambo$ python3.7 future_return_test.py 
Adding new task
Sleeping 5
future result [2.5, 10]
Adding new task
Adding new task
Sleeping 2.5
Sleeping 10
future result [1.25]
Adding new task
Sleeping 1.25
future result [0.625]
Adding new task
Sleeping 0.625
future result []
future result [5.0]
Adding new task
Sleeping 5.0
future result [2.5]
Adding new task
Sleeping 2.5
future result [1.25]
Adding new task
Sleeping 1.25
future result [0.625]
Adding new task
Sleeping 0.625
future result []
Waiting for tasks to complete
rambombp2015-674:Downloads rambo$ 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment