Skip to content

Instantly share code, notes, and snippets.

@thodnev
Last active February 26, 2017 03:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thodnev/ed43f4bf921c5b5747e1b86f1cc68c4e to your computer and use it in GitHub Desktop.
Save thodnev/ed43f4bf921c5b5747e1b86f1cc68c4e to your computer and use it in GitHub Desktop.
'''A throughput-limiting message dispatcher for Telegram bots'''
import sys
import time
import threading
if sys.version_info.major > 2:
import queue as q
else:
import Queue as q
# We need to count < 1s intervals, so the most accurate timer is needed
# Starting from Python 3.3 we have time.perf_counter which is the clock
# with the highest resolution available to the system, so let's use it there.
# In Python 2.7, there's no perf_counter yet, so fallback on what we have:
# on Windows, the best available is time.clock while time.time is on
# another platforms (M. Lutz, "Learning Python," 4ed, p.630-634)
if sys.version_info.major == 3 and sys.version_info.minor >= 3:
curtime = time.perf_counter
else:
curtime = time.clock if sys.platform[:3] == 'win' else time.time
class DispatchError(RuntimeError):
'''Indicates dispatching errors'''
pass
class LimitedDispatcher(threading.Thread):
'''Dispatches callbacks from queue with specified throughput limits.
'''
_instcnt = 0 # instance counter
def __init__(self, queue=None, burst_limit=30, time_limit_ms=1000,
exc_route=None, autostart=True, name=None):
'''Creates a separate thread to dispatch callbacks with delays.
Args:
queue (:obj:`queue.Queue`, optional): used to pass callbacks to
thread.
Creates `queue.Queue` implicitly if not provided.
burst_limit (:obj:`int`, optional): numer of maximum callbacks to
dispatch per time-window defined by `time_limit_ms`.
Defaults to 30.
time_limit_ms (:obj:`int`, optional): defines width of time-window
in each dispatching limit is calculated.
Defaults to 1000.
exc_route (:obj:`callable`, optional): a callable, accepting 1
positional argument; used to route exceptions from dispatcher
thread to main thread; is called on `Exception` subclass
exceptions.
If not provided, exceptions are routed through dummy handler,
which re-raises them.
autostart (:obj:`bool`, optional): if True, dispatcher is started
immediately after object's creation; if False, should be
started manually by `start` method.
Defaults to True.
name (:obj:`str`, optional): thread's name.
Defaults to ``'LimitedDispatcher-N'``, where N is sequential
number of object created.
'''
self._queue = queue if queue is not None else q.Queue()
self.burst_limit = burst_limit
self.time_limit = time_limit_ms/1000
self.exc_route = (exc_route if exc_route is not None
else self._default_exception_handler)
self.__exit_req = False # flag to gently exit thread
self.__class__._instcnt += 1
if name is None:
name = '%s-%s' % (self.__class__.__name__, self.__class__._instcnt)
super(LimitedDispatcher, self).__init__(name=name)
self.daemon = False
if autostart: # immediately start dispatching
super(LimitedDispatcher, self).start()
def run(self):
'''Do not use the method except for unthreaded testing purposes,
the method normally is automatically called by `start` method.
'''
times = [] # used to store each callable dispatch time
while True:
item = self._queue.get()
if self.__exit_req:
return # shutdown thread
# delay routine
now = curtime()
t_delta = now - self.time_limit # calculate early to improve perf.
if times and t_delta > times[-1]:
# if last call was before the limit time-window
# used to impr. perf. in long-interval calls case
times = [now]
else:
# collect last in current limit time-window
times = [t for t in times if t >= t_delta]
times.append(now)
if len(times) >= self.burst_limit: # if throughput limit was hit
time.sleep(times[1] - t_delta)
# finally dispatch one
try:
func, args, kwargs = item
func(*args, **kwargs)
except Exception as exc: # re-route any exceptions
self.exc_route(exc) # to prevent thread exit
def stop(self, timeout=None):
'''Used to gently stop dispatching process and shutdown its thread.
Args:
timeout (:obj:`float`): indicates maximum time to wait for
dispatcher to stop and its thread to exit.
If timeout exceeds and dispatcher has not stopped, method
silently returns. `is_alive` method could be used afterwards
to check the actual status. If `timeout` set to None, blocks
until dispatcher is shut down.
Defaults to None.
Returns:
None
'''
self.__exit_req = True # gently request
self._queue.put(None) # put something to unfreeze if frozen
super(LimitedDispatcher, self).join(timeout=timeout)
@staticmethod
def _default_exception_handler(exc):
'''Dummy exception handler which re-raises exception in thread.
Could be possibly overwritten by subclasses.
'''
raise exc
def __call__(self, func, *args, **kwargs):
'''Used to dispatch callbacks in throughput-limiting thread
through queue.
Args:
func (:obj:`callable`): the actual function (or any callable) that
is dispatched through queue.
*args: variable-length `func` arguments.
**kwargs: arbitrary keyword-arguments to `func`.
Returns:
None
'''
if not self.is_alive() or self.__exit_req:
raise DispatchError('Could not dispatch into stopped thread')
self._queue.put((func, args, kwargs))
# self-test below
if __name__ == '__main__':
argv = sys.argv[1:]
N = int(argv[1]) if argv else 122
burst_limit = int(argv[2]) if len(argv) > 1 else 30
time_limit_ms = int(argv[3]) if len(argv) > 2 else 1000
margin_ms = int(argv[4]) if len(argv) > 3 else 0
print('Self-test with N = {} msgs, burst_limit = {} msgs, '
'time_limit = {} ms, margin = {} ms'
''.format(N, burst_limit, time_limit_ms, margin_ms))
testtimes = []
def testcall():
testtimes.append(curtime())
dsp = LimitedDispatcher(burst_limit=burst_limit,
time_limit_ms=time_limit_ms)
print('Started dispatcher {}\nStatus: {}'
''.format(dsp, ['inactive', 'active'][dsp.is_alive()]))
print('Dispatching {} calls @ {}'.format(N, time.asctime()))
for i in range(N):
dsp(testcall)
print('Queue filled, waiting 4 dispatch finish @ ' + str(time.asctime()))
while not dsp._queue.empty():
pass
dsp.stop()
print('Dispatcher ' + ['stopped', '!NOT STOPPED!'][dsp.is_alive()] +
' @ ' + str(time.asctime()))
print('Calculating call time windows')
passes, fails = [], []
for start, stop in enumerate(range(burst_limit+1, len(testtimes))):
part = testtimes[start:stop]
if (part[-1] - part[0]) >= ((time_limit_ms - margin_ms)/1000):
passes.append(part)
else:
print(start, stop)
fails.append(testtimes[start:stop])
print('Tested: {}, Passed: {}, Failed: {}'
''.format(len(passes+fails), len(passes), len(fails)))
if fails:
print('(!) Got mismatches: ' + ';\n'.join(map(str, fails)))
else:
print('3 parts (for reference)\n' +
('-'*80+'\n').join(map(str, passes[:3])))
print('*'*80)
print('RESULT: Test ' + ['failed', 'passed'][len(fails) == 0])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment