Skip to content

Instantly share code, notes, and snippets.

@vestige
Last active June 20, 2021 06:56
Show Gist options
  • Save vestige/cbf2f69f958d160138c637046a8a1d5d to your computer and use it in GitHub Desktop.
Save vestige/cbf2f69f958d160138c637046a8a1d5d to your computer and use it in GitHub Desktop.
from queue import Queue
import threading
import time
from gpiozero import LED
from gpiozero.pins.pigpio import PiGPIOFactory
import sys
from icecream import ic
PIN_LED_NO1 = 16
PIN_LED_NO2 = 20
LED_DICT = {
"no1" : PIN_LED_NO1,
"no2" : PIN_LED_NO2,
}
class LedThread(threading.Thread):
"""
LED管理
例:
queue経由で、{"name":"no1", "action":"on"}
を取得すると、LED1を点灯
"""
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
self.setDaemon(True)
self._rcv_que = Queue()
self._leds = {}
factory = PiGPIOFactory()
for key, pin in LED_DICT.items():
self._leds[key] = LED(pin, pin_factory = PiGPIOFactory())
return
def stop(self):
self.stop_event.set()
return
def run(self):
while True:
value = self.rcv_que.get()
ic("[led_th]", value)
if "led" not in value["type"]:
ic("[led_th]", "error!!!")
continue
if value["name"] in self._leds:
name = value["name"]
on_off = True if ("on" in value["action"]) else False
self._write_led(name, on_off)
return
@property
def rcv_que(self):
return self._rcv_que
def _write_led(self, name, on_off):
if on_off:
self._leds[name].on()
else:
self._leds[name].off()
return
def main():
import time
led_th = LedThread()
led_th.start()
q = led_th.rcv_que
q.put({"type": "led", "name": "no1", "action": "on"})
time.sleep(3)
q.put({"type": "led", "name": "no1", "action": "off"})
time.sleep(1)
q.put({"type": "led", "name": "no2", "action": "on"})
time.sleep(3)
q.put({"type": "led", "name": "no2", "action": "off"})
time.sleep(1)
led_th.stop()
return
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment