Created
October 19, 2017 18:36
-
-
Save duggan/fa3224a601b244e0ad2cc8bc2b37e0e1 to your computer and use it in GitHub Desktop.
Radio Gate Controller
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import signal | |
import time | |
import logging | |
import RPi.GPIO as GPIO | |
from collections import deque | |
from threading import Thread, Lock, Event | |
from twilio.rest import Client | |
from blink1_pyusb import Blink1 | |
DRY_RUN = True | |
HOME = "CCC" | |
CONFIG = { | |
"account_sid": "XXX", | |
"auth_token": "YYY", | |
"A": "AAA", | |
"B": "BBB" | |
} | |
A = 3 | |
B = 2 | |
PINS = { A: "A", B: "B" } | |
class Receiver(object): | |
def __init__(self, config, logger, shutdown): | |
self.shutdown = shutdown | |
self.config = config | |
self.logger = logger | |
self.klaxon = Klaxon(self.config, self.logger, self.shutdown) | |
self.dialler = Dialler(self.config, self.logger, self.shutdown) | |
def start(self): | |
self.dialler.run() | |
def signal(self, pin): | |
self.klaxon.start() | |
self.dialler.dial(self.config[PINS[pin]]) | |
class Klaxon(object): | |
def __init__(self, config, logger, shutdown): | |
self.shutdown = shutdown | |
self.config = config | |
self.logger = logger | |
self.lock = Lock() | |
self.blink1 = Blink1() | |
if ( self.blink1.dev == None ): | |
self.logger.error("No blink(1) device found.") | |
def run(self): | |
if self.lock.acquire(False): | |
self.logger.info("Firing klaxon") | |
for x in range(0, 20): | |
if not self.shutdown.is_set(): | |
self.blink1.fade_to_rgb(500, 255, 0, 0) | |
time.sleep(0.5) | |
self.blink1.fade_to_rgb(500, 0, 255, 0) | |
time.sleep(0.5) | |
self.blink1.fade_to_rgb(500, 0, 0, 255) | |
time.sleep(0.5) | |
self.blink1.fade_to_rgb( 1000, 0,0,0 ) | |
self.lock.release() | |
self.logger.info("Klaxon ended") | |
else: | |
self.logger.warning("Klaxon already running.") | |
def start(self): | |
thread = Thread(target=self.run) | |
thread.start() | |
class Dialler(object): | |
def __init__(self, config, logger, shutdown): | |
self.shutdown = shutdown | |
self.logger = logger | |
self.config = config | |
self.queue = deque() | |
self.client = Client(self.config['account_sid'], self.config['auth_token']) | |
def call(self, number): | |
self.queue.append(number) | |
def dial(self, number): | |
self.logger.info("Dialling %s...", number) | |
if not DRY_RUN: | |
call = self.client.calls.create(to=number, from_=HOME, | |
url="https://www.twilio.com/docs/twiml-snippet/quickstart") | |
self.logger.info("Call placed (id: %s)", call.sid) | |
else: | |
self.logger.info("Dry run, call not placed.") | |
def runner(self): | |
while not self.shutdown.is_set(): | |
try: | |
thread = Thread(target=self.dial, args=(self.queue.pop())) | |
thread.start() | |
except IndexError: | |
pass | |
time.sleep(1) | |
def run(self): | |
thread = Thread(target=self.runner) | |
thread.start() | |
def main(): | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s %(levelname)s %(message)s') | |
shutdown = Event() | |
def service_shutdown(signum, frame): | |
print('Caught signal %d' % signum) | |
shutdown.set() | |
signal.signal(signal.SIGTERM, service_shutdown) | |
signal.signal(signal.SIGINT, service_shutdown) | |
logger = logging.getLogger('controller') | |
logger.info("Resetting blink(1)...") | |
blink1 = Blink1() | |
blink1.fade_to_rgb( 0, 0,0,0 ) | |
logger.info("Setting up receiver...") | |
receiver = Receiver(CONFIG, logger, shutdown) | |
receiver.start() | |
logger.info("Configuring GPIO...") | |
GPIO.setmode(GPIO.BCM) | |
GPIO.setup(A, GPIO.IN) | |
GPIO.add_event_detect(A, GPIO.RISING) | |
GPIO.add_event_callback(A, receiver.signal) | |
GPIO.setup(B, GPIO.IN) | |
GPIO.add_event_detect(B, GPIO.RISING) | |
GPIO.add_event_callback(B, receiver.signal) | |
logger.info("Running.") | |
while not shutdown.is_set(): | |
time.sleep(0.1) | |
logger.info("Shutting down...") | |
GPIO.cleanup() | |
time.sleep(5) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment