Skip to content

Instantly share code, notes, and snippets.

@delfick
Last active June 13, 2018 18:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save delfick/0ad394471d6d7dc3762e3cce923484bb to your computer and use it in GitHub Desktop.
Save delfick/0ad394471d6d7dc3762e3cce923484bb to your computer and use it in GitHub Desktop.

This is an implementation of https://github.com/HoldenGs/LIFX_controller using the photons library

To run just make a virtualenv and run pip install -r requirements.txt and then python visualise.py

By default it'll find all the lights on the network, but you can specify -l or --light with a photons reference (see https://delfick.github.io/photons-core/lifx_script.html#references)

For example, to apply the visualisation to all the strips on your network you'd say python visualise.py -l match:cap=multizone.

Note that this implementation just sets whole colour on the strip and doesn't do individual zones.

lifx-photons-core==0.5.4
sounddevice==0.3.11
numpy==1.14.5
from photons_app.special import FoundSerials, HardCodedSerials, SpecialReference
from photons_device_messages import DeviceMessages
from photons_app.executor import library_setup
from photons_script.script import ATarget
from photons_colour import ColourMessages
from delfick_logging import setup_logging
import sounddevice as sd
import argparse
import logging
import asyncio
import time
log = logging.getLogger("visualize")
def resolve_reference(collector, reference, target):
if reference in (None, "", "_"):
reference = FoundSerials()
if type(reference) is str:
if ":" in reference:
typ, options = reference.split(":", 1)
reference = collector.configuration["reference_resolver_register"].resolve(typ, options, target)
if not isinstance(reference, SpecialReference):
return HardCodedSerials(reference)
return reference
class HueItr:
def __init__(self):
self.add = True
self.hue = 0
def __iter__(self):
return self
def __next__(self):
if self.add:
self.hue += 1
if self.hue >= 360:
self.add = False
self.hue = 360
else:
self.hue -= 1
if self.hue <= 0:
self.add = True
self.hue = False
return self.hue
def make_parser():
parser = argparse.ArgumentParser(description="Music visualizer for strips")
parser.add_argument('--list-devices'
, action = 'store_true'
, help = 'show list of audio devices and exit'
)
parser.add_argument('-d', '--device'
, help = 'device specifier'
)
parser.add_argument('-l', '--light'
, help = 'the lights to change'
)
parser.add_argument('-w', '--window'
, type = float
, default = 200
, metavar = 'DURATION'
, help = 'visible time slot (default: %(default)s ms)'
)
parser.add_argument('-i', '--interval'
, type = float
, default = 30
, help = 'minimum time between plot updates (default: %(default)s ms)'
)
parser.add_argument('-b', '--blocksize'
, type = int
, help = 'block size (in samples)'
)
parser.add_argument('-r', '--samplerate'
, type = float
, help = 'sampling rate of audio device'
)
parser.add_argument('-n', '--downsample'
, type = int
, default = 10
, metavar = 'N'
, help = 'display every Nth sample (default: %(default)s)'
)
parser.add_argument('channels'
, type = int
, default = [1]
, nargs = '*'
, metavar = 'CHANNEL'
, help = 'input channels to plot (default: the first)'
)
return parser
def main(argv=None):
setup_logging()
parser = make_parser()
args = parser.parse_args(argv)
if any(c < 1 for c in args.channels):
parser.error('argument CHANNEL: must be >= 1')
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
if args.samplerate is None:
device_info = sd.query_devices(args.device, 'input')
args.samplerate = device_info['default_samplerate']
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main_async(args))
except KeyboardInterrupt:
pass
async def main_async(args):
loop = asyncio.get_event_loop()
queue = asyncio.LifoQueue()
collector = library_setup()
lan_target = collector.configuration['target_register'].resolve("lan")
# Channel numbers start with 1
mapping = [c - 1 for c in args.channels]
def callback(indata, frames, tm, status):
if status:
log.info(status)
# Fancy indexing with mapping creates a (necessary!) copy:
loop.call_soon_threadsafe(queue.put_nowait, (time.time(), indata[::args.downsample, mapping]))
stream = sd.InputStream(
device = args.device
, channels = max(args.channels)
, samplerate = args.samplerate
, callback = callback
)
info = {"last_update": time.time(), "hue": iter(HueItr())}
reference = resolve_reference(collector, args.light, lan_target)
async with ATarget(lan_target) as afr:
# Make sure our chosen device exists
await reference.find(afr, lan_target.default_broadcast, 20)
# Turn the lights on
await lan_target.script(DeviceMessages.SetPower(level=65535)).run_with_all(reference, afr)
async def update(data):
hue = next(info["hue"])
max_amp = max(data)[0] * 500
if max_amp > 100:
max_amp = 100
if max_amp < 1:
max_amp = 1
msg = ColourMessages.SetColor(hue=next(info["hue"]), saturation=0.5, brightness=max_amp/100, res_required=False)
await lan_target.script(msg).run_with_all(reference, afr, error_catcher=[])
with stream:
while True:
t, nxt = await queue.get()
if info["last_update"] < t:
nxt_update = info["last_update"] + 0.1
if t < nxt_update:
loop.call_later(nxt_update - time.time(), queue.put_nowait, (t, nxt))
else:
info["last_update"] = time.time()
await update(nxt)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment