Created
August 7, 2016 10:20
-
-
Save ericsalim/89f52aadd47ace4793fc7d2803365073 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
##### start of rpicenter library code ##### | |
from abc import ABCMeta, abstractmethod | |
from threading import Thread | |
class Adapter(): | |
def __init__(self, device_object_id, slot, gpio_pin): | |
self._device_object_id = device_object_id | |
self._slot = slot | |
self._gpio_pin = gpio_pin | |
def output(self, value): | |
print('output slot {} pin {} value {}'.format(self._slot, self._gpio_pin, value)) | |
#TODO: gpio.output | |
def input(self): | |
print('input slot {} pin {}'.format(self._slot, self._gpio_pin)) | |
#TODO: gpio.input | |
class DeviceThread(Thread): | |
def __init__(self, device): | |
super(DeviceThread, self).__init__() | |
self._device = device | |
def run(self): | |
self._device.loop() | |
class Device: | |
__metaclass__ = ABCMeta | |
def __init__(self, *slots): | |
self._slots = slots | |
def _get_adapter(self, slot): | |
return core.get_adapter(self, slot) | |
@abstractmethod | |
def loop(self): | |
pass | |
@abstractmethod | |
def request_stop(self): | |
pass | |
class Core(): | |
def __init__(self): | |
self._devices = [] #type:Device[] | |
self._device_threads = [] #type:DeviceThread[] | |
self._adapters = [] #type: Adapter[] | |
def _check_slot_used(self, device, slot): | |
for a in self._adapters: | |
if a._device_object_id == id(device) and a._slot == slot: | |
return True | |
return False | |
def _check_pin_used(self, gpio_pin): | |
for a in self._adapters: | |
if a._gpio_pin == gpio_pin: | |
return True | |
return False | |
def register_adapter(self, device, slot, gpio_pin): | |
if self._check_slot_used(device, slot): | |
raise Exception('Slot is already used') | |
if self._check_pin_used(gpio_pin): | |
raise Exception('GPIO pin is already used') | |
self._adapters.append(Adapter(id(device), slot, gpio_pin)) | |
def get_adapter(self, device, slot): | |
for a in self._adapters: | |
if a._device_object_id == id(device) and a._slot == slot: | |
return a | |
raise Exception('Slot not found') | |
def register_device(self, device): | |
for d in self._devices: | |
if id(d) == id(device): | |
raise Exception('Device is already registered') | |
self._devices.append(device) | |
self._device_threads.append(DeviceThread(device)) | |
def loop(self): | |
for t in self._device_threads: | |
t.start() | |
def wait(self): | |
for t in self._device_threads: | |
t.join() | |
def request_stop(self): | |
for d in self._devices: | |
d.request_stop() | |
def cleanup(self): | |
self._adapters = None | |
self._device_threads = None | |
self._devices = None | |
#TODO: cleanup gpio | |
if __name__ == '__main__': | |
core = Core() | |
def register_adapter(device, slot, gpio_pin): | |
core.register_adapter(device, slot, gpio_pin) | |
def register_device(device): | |
core.register_device(device) | |
def loop(): | |
core.loop() | |
def wait(): | |
core.wait() | |
def request_stop(): | |
core.request_stop() | |
def cleanup(): | |
core.cleanup() | |
##### end of rpicenter library code ##### | |
##### start of component library creator code ##### | |
import time | |
# device library creator creates a class that derives from rpicenter.Device class | |
# this class must have a constructor that lists all slots | |
# this class must also have methods loop and request_stop | |
class Temperature(Device): | |
def __init__(self): | |
super(Temperature, self).__init__('temp_input', 'temp_output') | |
self._flagstop = False | |
def loop(self): | |
while True: | |
if self._flagstop: | |
return | |
self._get_adapter('temp_input').input() | |
self._get_adapter('temp_output').output('28') | |
if self._flagstop: | |
return | |
time.sleep(3) | |
def request_stop(self): | |
print('Temperature stop requested') | |
self._flagstop = True | |
class IRTransmitter(Device): | |
def __init__(self): | |
super(IRTransmitter, self).__init__('ir_output') | |
self._flagstop = False | |
def loop(self): | |
while True: | |
if self._flagstop: | |
return | |
self._get_adapter('ir_output').output('1101111011011') | |
if self._flagstop: | |
return | |
time.sleep(2) | |
def request_stop(self): | |
print('IRTransmitter stop requested') | |
self._flagstop = True | |
##### end of component library creator code ##### | |
##### start of end user code ##### | |
# the end user reuse the Temperature class | |
t = Temperature() | |
# the slots used by Temperature class is mapped to end user's gpio pins | |
register_adapter(t, 'temp_input', 4) | |
register_adapter(t, 'temp_output', 17) | |
register_device(t) | |
# registering another component | |
ir = IRTransmitter() | |
register_adapter(ir, 'ir_output', 20) | |
register_device(ir) | |
# start main loop | |
loop() | |
try: | |
# it runs for 20 secs | |
time.sleep(20) | |
request_stop() | |
except (KeyboardInterrupt): #stop on keyboard interrupt | |
request_stop() | |
wait() | |
cleanup() | |
##### end of end user code ##### |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment