Skip to content

Instantly share code, notes, and snippets.

@fennecinspace
Created December 31, 2018 00:30
Show Gist options
  • Save fennecinspace/b635434a24c71a22b1d0e43ceee12026 to your computer and use it in GitHub Desktop.
Save fennecinspace/b635434a24c71a22b1d0e43ceee12026 to your computer and use it in GitHub Desktop.
Timeout Thread
import threading
class TimedThread(threading.Thread):
def __init__(self, fct, *args , timeout = 30, timeout_exception = True, **kwargs):
super().__init__(target = fct, args = args, kwargs = kwargs)
self.fct = fct
self.result = None
self.timeout = timeout
self.timeout_exception = timeout_exception
def run(self):
if self._target:
self.result = self._target(*self._args, **self._kwargs)
def start(self, daemon = True):
self.daemon = daemon
super().start()
self.join(self.timeout)
if self.is_alive():
if self.timeout_exception:
raise TimeoutError()
return self.result
def timeout(timeout, timeout_exception = True):
def timeout_decorator(fct):
def wrapper(*args, **kwargs):
t = TimedThread(fct, timeout = timeout, timeout_exception = timeout_exception, *args, **kwargs)
t.start()
return t.result
return wrapper
return timeout_decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment