Skip to content

Instantly share code, notes, and snippets.

@ericsalim
Created August 7, 2016 10:20
Show Gist options
  • Save ericsalim/89f52aadd47ace4793fc7d2803365073 to your computer and use it in GitHub Desktop.
Save ericsalim/89f52aadd47ace4793fc7d2803365073 to your computer and use it in GitHub Desktop.
##### start of rpicenter library code #####
from abc import ABCMeta, abstractmethod
from threading import Thread
class Adapter():
def __init__(self, device_object_id, slot, gpio_pin):
self._device_object_id = device_object_id
self._slot = slot
self._gpio_pin = gpio_pin
def output(self, value):
print('output slot {} pin {} value {}'.format(self._slot, self._gpio_pin, value))
#TODO: gpio.output
def input(self):
print('input slot {} pin {}'.format(self._slot, self._gpio_pin))
#TODO: gpio.input
class DeviceThread(Thread):
def __init__(self, device):
super(DeviceThread, self).__init__()
self._device = device
def run(self):
self._device.loop()
class Device:
__metaclass__ = ABCMeta
def __init__(self, *slots):
self._slots = slots
def _get_adapter(self, slot):
return core.get_adapter(self, slot)
@abstractmethod
def loop(self):
pass
@abstractmethod
def request_stop(self):
pass
class Core():
def __init__(self):
self._devices = [] #type:Device[]
self._device_threads = [] #type:DeviceThread[]
self._adapters = [] #type: Adapter[]
def _check_slot_used(self, device, slot):
for a in self._adapters:
if a._device_object_id == id(device) and a._slot == slot:
return True
return False
def _check_pin_used(self, gpio_pin):
for a in self._adapters:
if a._gpio_pin == gpio_pin:
return True
return False
def register_adapter(self, device, slot, gpio_pin):
if self._check_slot_used(device, slot):
raise Exception('Slot is already used')
if self._check_pin_used(gpio_pin):
raise Exception('GPIO pin is already used')
self._adapters.append(Adapter(id(device), slot, gpio_pin))
def get_adapter(self, device, slot):
for a in self._adapters:
if a._device_object_id == id(device) and a._slot == slot:
return a
raise Exception('Slot not found')
def register_device(self, device):
for d in self._devices:
if id(d) == id(device):
raise Exception('Device is already registered')
self._devices.append(device)
self._device_threads.append(DeviceThread(device))
def loop(self):
for t in self._device_threads:
t.start()
def wait(self):
for t in self._device_threads:
t.join()
def request_stop(self):
for d in self._devices:
d.request_stop()
def cleanup(self):
self._adapters = None
self._device_threads = None
self._devices = None
#TODO: cleanup gpio
if __name__ == '__main__':
core = Core()
def register_adapter(device, slot, gpio_pin):
core.register_adapter(device, slot, gpio_pin)
def register_device(device):
core.register_device(device)
def loop():
core.loop()
def wait():
core.wait()
def request_stop():
core.request_stop()
def cleanup():
core.cleanup()
##### end of rpicenter library code #####
##### start of component library creator code #####
import time
# device library creator creates a class that derives from rpicenter.Device class
# this class must have a constructor that lists all slots
# this class must also have methods loop and request_stop
class Temperature(Device):
def __init__(self):
super(Temperature, self).__init__('temp_input', 'temp_output')
self._flagstop = False
def loop(self):
while True:
if self._flagstop:
return
self._get_adapter('temp_input').input()
self._get_adapter('temp_output').output('28')
if self._flagstop:
return
time.sleep(3)
def request_stop(self):
print('Temperature stop requested')
self._flagstop = True
class IRTransmitter(Device):
def __init__(self):
super(IRTransmitter, self).__init__('ir_output')
self._flagstop = False
def loop(self):
while True:
if self._flagstop:
return
self._get_adapter('ir_output').output('1101111011011')
if self._flagstop:
return
time.sleep(2)
def request_stop(self):
print('IRTransmitter stop requested')
self._flagstop = True
##### end of component library creator code #####
##### start of end user code #####
# the end user reuse the Temperature class
t = Temperature()
# the slots used by Temperature class is mapped to end user's gpio pins
register_adapter(t, 'temp_input', 4)
register_adapter(t, 'temp_output', 17)
register_device(t)
# registering another component
ir = IRTransmitter()
register_adapter(ir, 'ir_output', 20)
register_device(ir)
# start main loop
loop()
try:
# it runs for 20 secs
time.sleep(20)
request_stop()
except (KeyboardInterrupt): #stop on keyboard interrupt
request_stop()
wait()
cleanup()
##### end of end user code #####
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment