Skip to content

Instantly share code, notes, and snippets.

@ksdme
Created August 15, 2020 18:57
Show Gist options
  • Save ksdme/d8066b77d4bd3ed76d8070312d763b5f to your computer and use it in GitHub Desktop.
Save ksdme/d8066b77d4bd3ed76d8070312d763b5f to your computer and use it in GitHub Desktop.
from time import sleep
from functools import wraps
from multiprocessing import Manager
from multiprocessing import Process
def with_timeout(duration):
def _decorator_base(func):
def _func_worker(*args, returns, **kwargs):
returns.append(func(*args, **kwargs))
@wraps(func)
def _func_with_timeout(*args, **kwargs):
manager = Manager()
returns = manager.list()
kwargs['returns'] = returns
process = Process(target=_func_worker, args=(args, kwargs))
process.start()
process.join(duration)
if process.is_alive():
process.terminate()
process.join()
raise TimeoutError('Timed Out')
if len(returns) > 0:
return returns[0]
return _func_with_timeout
return _decorator_base
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment