Skip to content

Instantly share code, notes, and snippets.

@PeterMitrano
Last active February 6, 2021 17:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PeterMitrano/ea3a89c1766a60400934b32844e48d84 to your computer and use it in GitHub Desktop.
Save PeterMitrano/ea3a89c1766a60400934b32844e48d84 to your computer and use it in GitHub Desktop.
Example of a class with a thread
#!/usr/bin/env python
import threading
import time
import weakref
class BadRobot:
def __init__(self):
self.should_disconnect = False
self.command_thread = threading.Thread(target=self.command_thread_func)
print(f"Starting {self}")
self.command_thread.start()
self.latest_command = None
def __del__(self):
# NOTE: this will never be called :(
print(f'deleting BadRobot {self}')
self.should_disconnect = True
def set_command(self, command):
self.latest_command = command
def command_thread_func(self):
while True:
print(f"{self} sending self.latest_command to motor controllers...")
if self.should_disconnect:
break
time.sleep(0.1)
class Robot:
def __init__(self):
self.should_disconnect = False
# NOTE: The trick is to use the pure function name Robot.threaded_func, which is not already bound to 'self'
# and then to pass a weak ref to thread_func instead and
self.command_thread = threading.Thread(target=Robot.command_thread_func, args=(weakref.proxy(self),))
print(f"Starting {self}")
self.command_thread.start()
self.latest_command = None
def __del__(self):
print(f'deleting Robot {self}')
self.should_disconnect = True
def set_command(self, command):
self.latest_command = command
def command_thread_func(self):
# NOTE: here self is a weakref, so Robot is eligable for garbage collection
# as even if this function is still running. Without the weak ref,
# Robot would be kept alive because this function still references 'self'
try:
while True:
print(f"{self} sending self.latest_command to motor controllers...")
if self.should_disconnect:
break
time.sleep(0.1)
except ReferenceError:
pass
def main():
# NOTE: this function creates "scope" for robot, which is necessary for this test
robot = Robot()
# NOTE: the following would never exit :(
#robot = BadRobot()
if __name__ == '__main__':
main()
#print("ok, BadRobot should be dead now.... but it's not.")
# NOTE: the following would never exit, because robot is never considered for garbage collection
#robot = Robot()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment