Skip to content

Instantly share code, notes, and snippets.

@jjaychen1e
Created May 19, 2021 14:23
Show Gist options
  • Save jjaychen1e/161a3e8f1147b61a20040535004bef10 to your computer and use it in GitHub Desktop.
Save jjaychen1e/161a3e8f1147b61a20040535004bef10 to your computer and use it in GitHub Desktop.
respeakerd_with_yeelight_demo.py
# -*- coding: utf-8 -*-
import os
import time
import logging
import signal
import threading
from respeakerd_source import RespeakerdSource
# from respeakerd_volume_ctl import VolumeCtl
from avs.alexa import Alexa
import sys
import mraa
from pixel_ring import pixel_ring
import socket
import time
import fcntl
import re
import os
import errno
import struct
from threading import Thread
from time import sleep
from collections import OrderedDict
detected_bulbs = {}
bulb_idx2ip = {}
DEBUGGING = False
RUNNING = True
current_command_id = 0
MCAST_GRP = '239.255.255.250'
scan_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
fcntl.fcntl(scan_socket, fcntl.F_SETFL, os.O_NONBLOCK)
listen_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
listen_socket.bind(("", 1982))
fcntl.fcntl(listen_socket, fcntl.F_SETFL, os.O_NONBLOCK)
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
listen_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
def bulbs_detection_loop():
'''
a standalone thread broadcasting search request and listening on all responses
'''
print("bulbs_detection_loop running")
search_interval=30000
read_interval=100
time_elapsed=0
while RUNNING:
if time_elapsed%search_interval == 0:
send_search_broadcast()
# scanner
while True:
try:
data = scan_socket.recv(2048)
except socket.error, e:
err = e.args[0]
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
break
else:
print e
sys.exit(1)
handle_search_response(data)
# passive listener
while True:
try:
data, addr = listen_socket.recvfrom(2048)
except socket.error, e:
err = e.args[0]
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
break
else:
print e
sys.exit(1)
handle_search_response(data)
time_elapsed+=read_interval
sleep(read_interval/1000.0)
scan_socket.close()
listen_socket.close()
def send_search_broadcast():
'''
multicast search request to all hosts in LAN, do not wait for response
'''
multicase_address = (MCAST_GRP, 1982)
print("send search request")
msg = "M-SEARCH * HTTP/1.1\r\n"
msg = msg + "HOST: 239.255.255.250:1982\r\n"
msg = msg + "MAN: \"ssdp:discover\"\r\n"
msg = msg + "ST: wifi_bulb"
scan_socket.sendto(msg, multicase_address)
def handle_search_response(data):
'''
Parse search response and extract all interested data.
If new bulb is found, insert it into dictionary of managed bulbs.
'''
location_re = re.compile("Location.*yeelight[^0-9]*([0-9]{1,3}(\.[0-9]{1,3}){3}):([0-9]*)")
match = location_re.search(data)
if match == None:
print( "invalid data received: " + data )
return
host_ip = match.group(1)
if detected_bulbs.has_key(host_ip):
bulb_id = detected_bulbs[host_ip][0]
else:
bulb_id = len(detected_bulbs)+1
host_port = match.group(3)
model = get_param_value(data, "model")
power = get_param_value(data, "power")
bright = get_param_value(data, "bright")
rgb = get_param_value(data, "rgb")
# use two dictionaries to store index->ip and ip->bulb map
detected_bulbs[host_ip] = [bulb_id, model, power, bright, rgb, host_port]
bulb_idx2ip[bulb_id] = host_ip
def get_param_value(data, param):
'''
match line of 'param = value'
'''
param_re = re.compile(param+":\s*([ -~]*)") #match all printable characters
match = param_re.search(data)
value=""
if match != None:
value = match.group(1)
return value
def toggle_bulb(idx):
operate_on_bulb(idx, "toggle", "")
def operate_on_bulb(idx, method, params):
'''
Operate on bulb; no gurantee of success.
Input data 'params' must be a compiled into one string.
E.g. params="1"; params="\"smooth\"", params="1,\"smooth\",80"
'''
if not bulb_idx2ip.has_key(idx):
print "error: invalid bulb idx"
return
bulb_ip=bulb_idx2ip[idx]
port=detected_bulbs[bulb_ip][5]
try:
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print "connect ",bulb_ip, port ,"..."
tcp_socket.connect((bulb_ip, int(port)))
msg="{\"id\":" + str(next_cmd_id()) + ",\"method\":\""
msg += method + "\",\"params\":[" + params + "]}\r\n"
tcp_socket.send(msg)
tcp_socket.close()
except Exception as e:
print "Unexpected error:", e
def next_cmd_id():
global current_command_id
current_command_id += 1
return current_command_id
def main():
detection_thread = Thread(target=bulbs_detection_loop)
detection_thread.start()
logging.basicConfig(level=logging.DEBUG)
#logging.getLogger('avs.alexa').setLevel(logging.INFO)
logging.getLogger('hpack.hpack').setLevel(logging.INFO)
en = mraa.Gpio(12)
if os.geteuid() != 0 :
time.sleep(1)
en.dir(mraa.DIR_OUT)
en.write(0)
src = RespeakerdSource()
# alexa = Alexa()
# ctl = VolumeCtl()
# src.link(alexa)
# pixel_ring.think()
state = 'thinking'
last_dir = 0
def on_detected(dir, index):
global state
global last_dir
global bulb_idx2ip
logging.info('detected hotword:{} at {}`'.format(index, dir))
state = 'detected'
last_dir = (dir + 360 - 60)%360
print('detected hotword:{} at {}`'.format(index, dir))
# alexa.listen()
print(bulb_idx2ip)
toggle_bulb(1)
pixel_ring.wakeup(last_dir)
def on_vad():
# when someone is talking
# print("."),
sys.stdout.flush()
def on_silence():
# when it is silent
pass
src.set_callback(on_detected)
src.set_vad_callback(on_vad)
src.set_silence_callback(on_silence)
src.recursive_start()
src.on_cloud_ready()
is_quit = threading.Event()
def signal_handler(signal, frame):
print('Quit')
is_quit.set()
signal.signal(signal.SIGINT, signal_handler)
while not is_quit.is_set():
try:
time.sleep(1)
except SyntaxError:
pass
except NameError:
pass
src.recursive_stop()
global RUNNING
RUNNING = False
detection_thread.join()
en.write(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment