Skip to content

Instantly share code, notes, and snippets.

@duggan
Created October 19, 2017 18:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save duggan/fa3224a601b244e0ad2cc8bc2b37e0e1 to your computer and use it in GitHub Desktop.
Save duggan/fa3224a601b244e0ad2cc8bc2b37e0e1 to your computer and use it in GitHub Desktop.
Radio Gate Controller
import signal
import time
import logging
import RPi.GPIO as GPIO
from collections import deque
from threading import Thread, Lock, Event
from twilio.rest import Client
from blink1_pyusb import Blink1
DRY_RUN = True
HOME = "CCC"
CONFIG = {
"account_sid": "XXX",
"auth_token": "YYY",
"A": "AAA",
"B": "BBB"
}
A = 3
B = 2
PINS = { A: "A", B: "B" }
class Receiver(object):
def __init__(self, config, logger, shutdown):
self.shutdown = shutdown
self.config = config
self.logger = logger
self.klaxon = Klaxon(self.config, self.logger, self.shutdown)
self.dialler = Dialler(self.config, self.logger, self.shutdown)
def start(self):
self.dialler.run()
def signal(self, pin):
self.klaxon.start()
self.dialler.dial(self.config[PINS[pin]])
class Klaxon(object):
def __init__(self, config, logger, shutdown):
self.shutdown = shutdown
self.config = config
self.logger = logger
self.lock = Lock()
self.blink1 = Blink1()
if ( self.blink1.dev == None ):
self.logger.error("No blink(1) device found.")
def run(self):
if self.lock.acquire(False):
self.logger.info("Firing klaxon")
for x in range(0, 20):
if not self.shutdown.is_set():
self.blink1.fade_to_rgb(500, 255, 0, 0)
time.sleep(0.5)
self.blink1.fade_to_rgb(500, 0, 255, 0)
time.sleep(0.5)
self.blink1.fade_to_rgb(500, 0, 0, 255)
time.sleep(0.5)
self.blink1.fade_to_rgb( 1000, 0,0,0 )
self.lock.release()
self.logger.info("Klaxon ended")
else:
self.logger.warning("Klaxon already running.")
def start(self):
thread = Thread(target=self.run)
thread.start()
class Dialler(object):
def __init__(self, config, logger, shutdown):
self.shutdown = shutdown
self.logger = logger
self.config = config
self.queue = deque()
self.client = Client(self.config['account_sid'], self.config['auth_token'])
def call(self, number):
self.queue.append(number)
def dial(self, number):
self.logger.info("Dialling %s...", number)
if not DRY_RUN:
call = self.client.calls.create(to=number, from_=HOME,
url="https://www.twilio.com/docs/twiml-snippet/quickstart")
self.logger.info("Call placed (id: %s)", call.sid)
else:
self.logger.info("Dry run, call not placed.")
def runner(self):
while not self.shutdown.is_set():
try:
thread = Thread(target=self.dial, args=(self.queue.pop()))
thread.start()
except IndexError:
pass
time.sleep(1)
def run(self):
thread = Thread(target=self.runner)
thread.start()
def main():
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s')
shutdown = Event()
def service_shutdown(signum, frame):
print('Caught signal %d' % signum)
shutdown.set()
signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)
logger = logging.getLogger('controller')
logger.info("Resetting blink(1)...")
blink1 = Blink1()
blink1.fade_to_rgb( 0, 0,0,0 )
logger.info("Setting up receiver...")
receiver = Receiver(CONFIG, logger, shutdown)
receiver.start()
logger.info("Configuring GPIO...")
GPIO.setmode(GPIO.BCM)
GPIO.setup(A, GPIO.IN)
GPIO.add_event_detect(A, GPIO.RISING)
GPIO.add_event_callback(A, receiver.signal)
GPIO.setup(B, GPIO.IN)
GPIO.add_event_detect(B, GPIO.RISING)
GPIO.add_event_callback(B, receiver.signal)
logger.info("Running.")
while not shutdown.is_set():
time.sleep(0.1)
logger.info("Shutting down...")
GPIO.cleanup()
time.sleep(5)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment