Skip to content

Instantly share code, notes, and snippets.

@akayj
Last active September 9, 2016 17:09
Show Gist options
  • Save akayj/b184f703473739d5afb4 to your computer and use it in GitHub Desktop.
Save akayj/b184f703473739d5afb4 to your computer and use it in GitHub Desktop.
timeout implemented by thread
import threading
from functools import wraps
from Queue import Queue, Empty
class Timeout(object):
def __init__(self, seconds):
self.seconds = seconds
self.q = Queue(1)
def __call__(self, fn):
@wraps(fn)
def wrapper(*a, **kw):
def t_fn(q):
ret = fn(*a, **kw)
print 'ret = ', ret
self.q.put(ret)
t = threading.Thread(target=t_fn, args=(self.q,))
t.daemon = True
t.start()
try:
# t.join(self.seconds)
# return self.q.get_nowait()
ret = self.q.get(timeout=self.seconds)
return ret
except Empty:
print 'It is Empty!'
ret = None
return ret
return wrapper
def timeout(seconds):
def decorator(fn):
@wraps(fn)
def wrapper(*a, **kw):
def _fn(q):
ret = fn(*a, **kw)
q.put(ret)
q = Queue(1)
t = threading.Thread(target=_fn, args=(q,))
t.daemon = True
t.start()
try:
return q.get(timeout=seconds)
except Empty:
ret = None
return ret
return wrapper
return decorator
@timeout(1.5)
def sum(a, b):
import time
time.sleep(1)
return a + b
print sum(1, 2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment