Skip to content

Instantly share code, notes, and snippets.

@pankajthekush
Last active November 10, 2022 10:22
Show Gist options
  • Save pankajthekush/9fcaca4847124a89b42d4be5d7a2c344 to your computer and use it in GitHub Desktop.
Save pankajthekush/9fcaca4847124a89b42d4be5d7a2c344 to your computer and use it in GitHub Desktop.
Run a method for n seconds
class FuncTimer():
def __init__(self,fn,args,runtime):
self.fn = fn
self.args = args
self.queue = multiprocessing.Queue()
self.runtime = runtime
self.process = multiprocessing.Process(target=self.thread_caller)
def thread_caller(self):
with ThreadPoolExecutor() as executor:
future = executor.submit(self.fn, *self.args)
self.queue.put(future.result())
def __enter__(self):
return self
def start_run(self):
self.process.start()
self.process.join(timeout=self.runtime)
# wait for runtime then get qsize
if self.queue.qsize():
try:
out_res = self.queue.get(timeout=10)
except Exception:
out_res = None
else:
out_res = None
return out_res
def __exit__(self, exc_type, exc_value, exc_traceback):
if self.process.is_alive():
self.process.kill()
# calling class with very less runtime, it should be killed and return none
>>> from test import FuncTimer,worker_1
>>> with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp:
... res = fp.start_run()
... print(res)
...
very time consuming sleep
very time consuming sleep
very time consuming sleep
very time consuming sleep
very time consuming sleep
None
# calling class with increased runtime it , should return the calculation
>>> from test import FuncTimer,worker_1
>>> with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 15) as fp:
... res = fp.start_run()
... print(res)
...
very time consuming sleep
very time consuming sleep
very time consuming sleep
very time consuming sleep
very time consuming sleep
very time consuming sleep
very time consuming sleep
very time consuming sleep
very time consuming sleep
very time consuming sleep
6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment