Skip to content

Instantly share code, notes, and snippets.

@samarthbhargav
Last active June 20, 2022 09:50
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save samarthbhargav/5a515a399f7113137331 to your computer and use it in GitHub Desktop.
Save samarthbhargav/5a515a399f7113137331 to your computer and use it in GitHub Desktop.
A 'stoppable' thread for Python
# source: https://www.safaribooksonline.com/library/view/python-cookbook-2nd/0596007973/ch09s03.html
import threading
class TestThread(threading.Thread):
def _ _init_ _(self, name='TestThread'):
""" constructor, setting initial variables """
self._stopevent = threading.Event( )
self._sleepperiod = 1.0
threading.Thread._ _init_ _(self, name=name)
def run(self):
""" main control loop """
print "%s starts" % (self.getName( ),)
count = 0
while not self._stopevent.isSet( ):
count += 1
print "loop %d" % (count,)
self._stopevent.wait(self._sleepperiod)
print "%s ends" % (self.getName( ),)
def join(self, timeout=None):
""" Stop the thread and wait for it to end. """
self._stopevent.set( )
threading.Thread.join(self, timeout)
if _ _name_ _ == "_ _main_ _":
testthread = TestThread( )
testthread.start( )
import time
time.sleep(5.0)
testthread.join( )
@woswos
Copy link

woswos commented Jun 12, 2020

Thanks for the example. Here is the python3 compatible version if anyone wants:

import threading

class TestThread(threading.Thread):
    def __init__(self, name='TestThread'):
        """ constructor, setting initial variables """
        self._stopevent = threading.Event()
        self._sleepperiod = 1.0
        threading.Thread.__init__(self, name=name)

    def run(self):
        """ main control loop """
        print("%s starts" % (self.getName(),))
        count = 0
        while not self._stopevent.isSet():
            count += 1
            print("loop %d" % (count,))
            self._stopevent.wait(self._sleepperiod)
        print("%s ends" % (self.getName(),))

    def join(self, timeout=None):
        """ Stop the thread and wait for it to end. """
        self._stopevent.set()
        threading.Thread.join(self, timeout)


if __name__ == "__main__":
    testthread = TestThread()
    testthread.start()
    import time
    time.sleep(5.0)
    testthread.join()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment