Skip to content

Instantly share code, notes, and snippets.

@aisurfer
Created August 29, 2019 14:57
Show Gist options
  • Save aisurfer/805378a32be9976b611b99fc08dfb48a to your computer and use it in GitHub Desktop.
Save aisurfer/805378a32be9976b611b99fc08dfb48a to your computer and use it in GitHub Desktop.
PC threaded app skeleton
# coding: utf-8
from __future__ import print_function, division
import sys, os, re, signal, random
import time, threading
from device_thread import DeviceThread
class DeviceThread(threading.Thread):
def __init__(self, group=None, target=None, name=None, daemon=True, args=(), kwargs={}):
threading.Thread.__init__(
self, group=group, target=target,
name = name if name is not None else self.__class__.__name__,
args=args, kwargs=kwargs
)
self.daemon = daemon
self.stopped = threading.Event()
# --------Methods for processing -----------
def run_prepare(self):
pass
def process(self):
# DO SOME JOB
print("device", self.name, "process")
time.sleep(1 + random.random())
# commands should be processed somewhere in this method
# if command is disconnect do self.stopped.set()
#self.stopped.set()
def run_finish(self):
pass
# --------Methods for threading -----------
# must be called from EXTERNAL code
def start_processing(self):
self.stopped.clear()
self.start()
print("device", self.name, "started")
def run(self):
self.run_prepare()
while not self.stopped.is_set():
self.process()
self.run_finish()
print("device", self.name, "finished")
def set_stop(self):
self.stopped.set()
class SomeDevice(DeviceThread):
pass
# device implementation should be here
class PC:
def __init__(self):
self.devices = []
def run(self):
# CREATE
for i in range(5):
self.devices.append(SomeDevice(name = "d" + str(i)))
# START WORK
for d in self.devices:
d.start_processing()
# WAIT FOR FINISHING
try:
while threading.active_count() > 1: # 1 = main thread + all others
for d in self.devices:
if d.is_alive():
d.join(0.1)
except KeyboardInterrupt:
print("Got Ctrl+C")
for d in self.devices:
d.set_stop()
for d in self.devices:
if d.is_alive():
d.join()
print("Exit by Ctrl+C")
sys.exit(0)
if __name__ == "__main__":
pc = PC()
pc.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment