Skip to content

Instantly share code, notes, and snippets.

@BibMartin
Created November 29, 2017 06:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BibMartin/4381442209825c5a3d57f6b4a3c2acca to your computer and use it in GitHub Desktop.
Save BibMartin/4381442209825c5a3d57f6b4a3c2acca to your computer and use it in GitHub Desktop.
Using multiprocessing to run a blocking function with a timeout.
from tornado import ioloop, gen, concurrent
import pandas as pd
import time
import asyncio
from multiprocessing import TimeoutError, Process
from multiprocessing.dummy import Pool as ThreadPool
class F(object):
def __init__(self):
self.io_loop = ioloop.IOLoop()
def run_yield(self):
p = ThreadPool(1)
res = p.apply_async(self.run)
res.wait(3.)
if res.ready():
out = res.get()
open('foo.log', 'a').write('SUCCESS\n')
print('SUCCESS', out)
else:
open('foo.log', 'a').write('TIMEOUT\n')
print('TIMEOUT')
p.terminate()
def run(self, x=None):
for i in range(12):
open('foo.log', 'a').write(
'{}\n'.format(pd.Timestamp.utcnow()))
time.sleep(1)
return 'Finished'
f = F()
open('foo.log', 'a').write("======\n")
f.run_yield()
# Efforts to run it async:...
# ===========================
class G(object):
def __init__(self):
self.io_loop = ioloop.IOLoop()
self.executor = concurrent.futures.ProcessPoolExecutor()
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
#@concurrent.run_on_executor()
@gen.coroutine
def run_yield(self):
self.p = ThreadPool(1)
res = self.p.apply_async(self.run)
open('foo.log', 'a').write('will wait...\n')
yield (asyncio.sleep(3.))
open('foo.log', 'a').write('have waited...\n')
# res.wait(3.)
if res.ready():
out = res.get()
open('foo.log', 'a').write('SUCCESS\n')
print('SUCCESS', out)
else:
open('foo.log', 'a').write('TIMEOUT\n')
print('TIMEOUT')
self.p.terminate()
#self.executor.map(self.run, [1], timeout=3.)
#self.run()
#p = ThreadPool(1)
#res = p.apply_async(self.run)
#res.wait(3.)
#if res.ready():
# out = res.get(1.)
# print('SUCCESS', out)
#else:
# print('TIMEOUT')
#p.terminate()
#try:
# out = res.get(3.) # Wait timeout seconds for func to complete.
# return out
#except TimeoutError:
# print("Aborting due to timeout")
# p.terminate()
def run_yield0(self):
p = ThreadPool(1)
res = p.apply_async(self.run)
res.wait(3.)
if res.ready():
out = res.get(1.)
open('foo.log', 'a').write('SUCCESS\n')
print('SUCCESS', out)
else:
open('foo.log', 'a').write('TIMEOUT\n')
print('TIMEOUT')
p.terminate()
def run(self, x=None):
for i in range(12):
open('foo.log', 'a').write(
'{}\n'.format(pd.Timestamp.utcnow()))
time.sleep(1)
return 'Finished'
# async def foo(self):
# while True:
# x = await sleep_fun()
# open('foo.log', 'a').write('{}\n'.format(x))
def print(self):
open('foo.log', 'a').write('PRINT\n')
def stop(self):
open('foo.log', 'a').write('STOP\n')
self.io_loop.stop()
# self.executor.shutdown(wait=True)
#f.io_loop.run_sync(f.run_yield)
#f.io_loop.start()
#loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
#loop.run_until_complete(f.run_yield())
#loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment