Skip to content

Instantly share code, notes, and snippets.

@zzeleznick
Last active September 13, 2020 20:26
Show Gist options
  • Save zzeleznick/6f7de6c3bb89d294a491f0089750b3a7 to your computer and use it in GitHub Desktop.
Save zzeleznick/6f7de6c3bb89d294a491f0089750b3a7 to your computer and use it in GitHub Desktop.
Resumable Threads with Timer
$ python --version
Python 2.7.10
$ python timer_demo.py
2020-09-13 13:21:13,050 INFO MainProcess root Demo Begins
2020-09-13 13:21:13,051 INFO MainProcess root Starting the countdown
2020-09-13 13:21:13,051 DEBUG MainProcess rthread_logger Thread <ResumeableThread(T1, started daemon 123145348845568)> was wrapped
2020-09-13 13:21:13,051 INFO MainProcess root Waiting 3 seconds to pause
2020-09-13 13:21:13,051 INFO MainProcess root 10
2020-09-13 13:21:14,051 INFO MainProcess root 9
2020-09-13 13:21:15,055 INFO MainProcess root 8
2020-09-13 13:21:16,051 INFO MainProcess root Pausing the timer thread
2020-09-13 13:21:16,051 INFO MainProcess rthread_logger <ResumeableThread(T1, started daemon 123145348845568)> paused
2020-09-13 13:21:16,051 INFO MainProcess root Waiting 2 seconds to resume
2020-09-13 13:21:16,056 INFO MainProcess root 7
2020-09-13 13:21:17,060 INFO MainProcess root 6
2020-09-13 13:21:18,052 INFO MainProcess rthread_logger <ResumeableThread(T1, started daemon 123145348845568)> resumed
2020-09-13 13:21:18,052 INFO MainProcess root Countdown resumed
2020-09-13 13:21:18,052 INFO MainProcess root Waiting 3 seconds to stop
2020-09-13 13:21:18,065 INFO MainProcess root 5
2020-09-13 13:21:19,070 INFO MainProcess root 4
2020-09-13 13:21:20,072 INFO MainProcess root 3
2020-09-13 13:21:21,058 INFO MainProcess rthread_logger <ResumeableThread(T1, started daemon 123145348845568)> stopped
2020-09-13 13:21:21,058 INFO MainProcess root Demo complete
$
import logging
import threading
from functools import wraps
from warnings import warn
# Create logger
rthread_logger = logging.getLogger("rthread_logger")
def no_args(fnc):
return (fnc.__code__.co_argcount - int(hasattr(fnc, '__self__'))) < 1
class ResumeableThread(threading.Thread):
"""Thread class with methods to pause, resume, and stop.
The target method has to check for event conditions.
Based on Timer https://github.com/python/cpython/blob/2.7/Lib/threading.py#L1058
"""
def __init__(self, target, name=None, args=(), kwargs=None):
super(ResumeableThread, self).__init__(target=target, name=name, args=args, kwargs=kwargs)
self._pause_event = threading.Event()
self._resume_event = threading.Event()
self._stop_event = threading.Event()
@staticmethod
def _resumeable(fnc):
"""Abstracts thread property checks to enable a target function to become looping and resumable."""
@wraps(fnc)
def wrapped(*args, **kwargs):
thread = threading.current_thread()
rthread_logger.debug("Thread {} was wrapped".format(thread))
if not issubclass(type(thread), ResumeableThread):
raise RuntimeError("resumable wrapper can only be used within a ResumeableThread")
while not thread.stopped:
if thread.paused:
rthread_logger.info("Thread {} was paused".format(thread))
thread.wait_to_resume() # Wait for unpause
rthread_logger.info("Thread {} was unpaused".format(thread))
# Actual work here:
fnc(*args, **kwargs)
# Mark the marked function as wrapped
wrapped.__setattr__("resumeable_wrapped", True)
# Return the wrapped resumeable function
return wrapped
def run(self):
try:
if self._Thread__target:
ResumeableThread._resumeable(self._Thread__target)(
*self._Thread__args, **self._Thread__kwargs)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._Thread__target, self._Thread__args, self._Thread__kwargs
def pause(self):
if self.stopped:
warn("%s is already stopped" % str(self))
self._pause_event.set()
self._resume_event.clear()
rthread_logger.info("%s paused" % str(self))
def resume(self):
self._resume_event.set()
self._pause_event.clear()
rthread_logger.info("%s resumed" % str(self))
def stop(self):
self._stop_event.set()
self._pause_event.clear()
self._resume_event.clear()
rthread_logger.info("%s stopped" % str(self))
def wait_to_resume(self):
"""Waits (blocking) until pause event is set"""
rthread_logger.info("%s waiting to resume" % str(self))
self._resume_event.wait()
@property
def paused(self):
return self._pause_event.is_set()
@property
def resumed(self):
return self._resume_event.is_set()
@property
def stopped(self):
return self._stop_event.is_set()
Name of the gist
import time
import logging
# Internal Imports
from resumeable_thread import ResumeableThread
# Setup Logging
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s %(levelname)-10s %(processName)s %(name)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
def countdown(start=10, tick=1):
val = start
while val > 0:
logger.info(val)
time.sleep(tick)
val -= 1
logger.info("Happy New Years!")
logger.info("Demo Begins")
thread = ResumeableThread(target=countdown, name="T1", args=(10,))
thread.daemon = True
def demo_story_points():
yield "Starting the countdown"
thread.start()
yield "Waiting 3 seconds to pause"
time.sleep(3)
yield "Pausing the timer thread"
thread.pause()
yield "Waiting 2 seconds to resume"
time.sleep(2)
thread.resume()
yield "Countdown resumed"
yield "Waiting 3 seconds to stop"
time.sleep(3)
thread.stop()
for message in demo_story_points():
logger.info(message)
logger.info("Demo complete")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment