Skip to content

Instantly share code, notes, and snippets.

@tjoen
Last active October 21, 2018 08:30
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save tjoen/9851436cedb68f6d6a08be14067c85f4 to your computer and use it in GitHub Desktop.
Save tjoen/9851436cedb68f6d6a08be14067c85f4 to your computer and use it in GitHub Desktop.
button.py
# Mycroft quick and dirty led and button tester
#
# Put mbus.py, button.py and light.py in /home/pi
# Run python lights.py in another terminal
# or better: add it to your autorun.sh like this:
# python /home/pi/lights.py </dev/null &>/dev/null &
# just before:
# else
# # running from a SSH session
# echo "Remote session"
# fi
#
# Todo: make proper daemon
# quit using crtl + \
#
# short press does a stop, longer then 2 seconds presses start mic listening
import subprocess
import time
import RPi.GPIO as GPIO
from subprocess import call
gpio_pin=23 # The GPIO pin the button is attached to
longpress_threshold=2 # If button is held this length of time, tells system to leave light on
GPIO.setmode(GPIO.BCM)
GPIO.setup(gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
while True:
time.sleep(0.2)
if GPIO.input(gpio_pin) == False: # Listen for the press, the loop until it steps
print "Started press"
pressed_time=time.time()
while GPIO.input(gpio_pin) == False:
time.sleep(0.2)
pressed_time=time.time()-pressed_time
print "Button pressed %d" % pressed_time
if pressed_time<longpress_threshold:
call(['python', "/home/pi/mbus.py", "localhost", "mycroft.stop"])
else:
call(['python', "/home/pi/mbus.py", "localhost", "mycroft.mic.listen"])
import websocket
import time
import itertools
import threading
import time
import RPi.GPIO as GPIO
import json
import subprocess
def on_message(ws, message):
mes = json.loads(message)
print(mes)
if mes['type'] == 'recognizer_loop:record_begin':
my_led.set_state(LED.ON)
if mes['type'] == 'recognizer_loop:audio_output_start':
my_led.set_state(LED.BEACON)
if mes['type'] == 'recognizer_loop:record_end':
my_led.set_state(LED.OFF)
if mes['type'] == 'recognizer_loop:audio_output_end':
my_led.set_state(LED.OFF)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
class LED:
"""Starts a background thread to show patterns with the LED.
Simple usage:
my_led = LED(channel = 25)
my_led.start()
my_led.set_state(LED.BEACON)
my_led.stop()
"""
OFF = 0
ON = 1
BLINK = 2
BLINK_3 = 3
BEACON = 4
BEACON_DARK = 5
DECAY = 6
PULSE_SLOW = 7
PULSE_QUICK = 8
def __init__(self, channel):
self.animator = threading.Thread(target=self._animate)
self.channel = channel
self.iterator = None
self.running = False
self.state = None
self.sleep = 0
GPIO.setmode(GPIO.BCM)
GPIO.setup(channel, GPIO.OUT)
self.pwm = GPIO.PWM(channel, 100)
self.lock = threading.Lock()
def __del__(self):
self.stop()
GPIO.cleanup(self.channel)
def start(self):
"""Start the LED driver."""
with self.lock: # pylint: disable=E1129
if not self.running:
self.running = True
self.pwm.start(0) # off by default
self.animator.start()
def stop(self):
"""Stop the LED driver and sets the LED to off."""
with self.lock: # pylint: disable=E1129
if self.running:
self.running = False
self.animator.join()
self.pwm.stop()
def set_state(self, state):
"""Set the LED driver's new state.
Note the LED driver must be started for this to have any effect.
"""
with self.lock: # pylint: disable=E1129
self.state = state
def _animate(self):
while True:
state = None
running = False
with self.lock: # pylint: disable=E1129
state = self.state
self.state = None
running = self.running
if not running:
return
if state is not None:
if not self._parse_state(state):
raise ValueError('unsupported state: %d' % state)
if self.iterator:
self.pwm.ChangeDutyCycle(next(self.iterator))
time.sleep(self.sleep)
else:
# We can also wait for a state change here with a Condition.
time.sleep(1)
def _parse_state(self, state):
self.iterator = None
self.sleep = 0.0
handled = False
if state == self.OFF:
self.pwm.ChangeDutyCycle(0)
handled = True
elif state == self.ON:
self.pwm.ChangeDutyCycle(100)
handled = True
elif state == self.BLINK:
self.iterator = itertools.cycle([0, 100])
self.sleep = 0.5
handled = True
elif state == self.BLINK_3:
self.iterator = itertools.cycle([0, 100] * 3 + [0, 0])
self.sleep = 0.25
handled = True
elif state == self.BEACON:
self.iterator = itertools.cycle(
itertools.chain([30] * 100, [100] * 8, range(100, 30, -5)))
self.sleep = 0.05
handled = True
elif state == self.BEACON_DARK:
self.iterator = itertools.cycle(
itertools.chain([0] * 100, range(0, 30, 3), range(30, 0, -3)))
self.sleep = 0.05
handled = True
elif state == self.DECAY:
self.iterator = itertools.cycle(range(100, 0, -2))
self.sleep = 0.05
handled = True
elif state == self.PULSE_SLOW:
self.iterator = itertools.cycle(
itertools.chain(range(0, 100, 2), range(100, 0, -2)))
self.sleep = 0.1
handled = True
elif state == self.PULSE_QUICK:
self.iterator = itertools.cycle(
itertools.chain(range(0, 100, 5), range(100, 0, -5)))
self.sleep = 0.05
handled = True
return handled
my_led = LED(channel = 25)
my_led.start()
my_led.set_state(LED.PULSE_QUICK)
process = subprocess.Popen(['python', "/home/pi/button.py"])
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://localhost:8181/core",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.run_forever()
#! /usr/bin/env python
# python mbus.py localhost "mycroft.stop"
import sys
from websocket import create_connection
uri = 'ws://' + sys.argv[1] + ':8181/core'
ws = create_connection(uri)
print "Sending " + sys.argv[2] + " to " + uri + "..."
if len(sys.argv) >= 4:
data = sys.argv[3]
else:
data = "{}"
message = '{"type": "' + sys.argv[2] + '", "data": "'+ data +'"}'
result = ws.send(message)
print "Receiving..."
result = ws.recv()
print "Received '%s'" % result
ws.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment