|
from photons_app.special import FoundSerials, HardCodedSerials, SpecialReference |
|
from photons_device_messages import DeviceMessages |
|
from photons_app.executor import library_setup |
|
from photons_script.script import ATarget |
|
from photons_colour import ColourMessages |
|
|
|
from delfick_logging import setup_logging |
|
import sounddevice as sd |
|
import argparse |
|
import logging |
|
import asyncio |
|
import time |
|
|
|
log = logging.getLogger("visualize") |
|
|
|
def resolve_reference(collector, reference, target): |
|
if reference in (None, "", "_"): |
|
reference = FoundSerials() |
|
|
|
if type(reference) is str: |
|
if ":" in reference: |
|
typ, options = reference.split(":", 1) |
|
reference = collector.configuration["reference_resolver_register"].resolve(typ, options, target) |
|
|
|
if not isinstance(reference, SpecialReference): |
|
return HardCodedSerials(reference) |
|
|
|
return reference |
|
|
|
class HueItr: |
|
def __init__(self): |
|
self.add = True |
|
self.hue = 0 |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
def __next__(self): |
|
if self.add: |
|
self.hue += 1 |
|
if self.hue >= 360: |
|
self.add = False |
|
self.hue = 360 |
|
else: |
|
self.hue -= 1 |
|
if self.hue <= 0: |
|
self.add = True |
|
self.hue = False |
|
|
|
return self.hue |
|
|
|
def make_parser(): |
|
parser = argparse.ArgumentParser(description="Music visualizer for strips") |
|
|
|
parser.add_argument('--list-devices' |
|
, action = 'store_true' |
|
, help = 'show list of audio devices and exit' |
|
) |
|
|
|
parser.add_argument('-d', '--device' |
|
, help = 'device specifier' |
|
) |
|
|
|
parser.add_argument('-l', '--light' |
|
, help = 'the lights to change' |
|
) |
|
|
|
parser.add_argument('-w', '--window' |
|
, type = float |
|
, default = 200 |
|
, metavar = 'DURATION' |
|
, help = 'visible time slot (default: %(default)s ms)' |
|
) |
|
|
|
parser.add_argument('-i', '--interval' |
|
, type = float |
|
, default = 30 |
|
, help = 'minimum time between plot updates (default: %(default)s ms)' |
|
) |
|
|
|
parser.add_argument('-b', '--blocksize' |
|
, type = int |
|
, help = 'block size (in samples)' |
|
) |
|
|
|
parser.add_argument('-r', '--samplerate' |
|
, type = float |
|
, help = 'sampling rate of audio device' |
|
) |
|
|
|
parser.add_argument('-n', '--downsample' |
|
, type = int |
|
, default = 10 |
|
, metavar = 'N' |
|
, help = 'display every Nth sample (default: %(default)s)' |
|
) |
|
|
|
parser.add_argument('channels' |
|
, type = int |
|
, default = [1] |
|
, nargs = '*' |
|
, metavar = 'CHANNEL' |
|
, help = 'input channels to plot (default: the first)' |
|
) |
|
|
|
return parser |
|
|
|
def main(argv=None): |
|
setup_logging() |
|
|
|
parser = make_parser() |
|
args = parser.parse_args(argv) |
|
|
|
if any(c < 1 for c in args.channels): |
|
parser.error('argument CHANNEL: must be >= 1') |
|
|
|
if args.list_devices: |
|
print(sd.query_devices()) |
|
parser.exit(0) |
|
|
|
if args.samplerate is None: |
|
device_info = sd.query_devices(args.device, 'input') |
|
args.samplerate = device_info['default_samplerate'] |
|
|
|
try: |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
loop.run_until_complete(main_async(args)) |
|
except KeyboardInterrupt: |
|
pass |
|
|
|
async def main_async(args): |
|
loop = asyncio.get_event_loop() |
|
queue = asyncio.LifoQueue() |
|
|
|
collector = library_setup() |
|
lan_target = collector.configuration['target_register'].resolve("lan") |
|
|
|
# Channel numbers start with 1 |
|
mapping = [c - 1 for c in args.channels] |
|
|
|
def callback(indata, frames, tm, status): |
|
if status: |
|
log.info(status) |
|
|
|
# Fancy indexing with mapping creates a (necessary!) copy: |
|
loop.call_soon_threadsafe(queue.put_nowait, (time.time(), indata[::args.downsample, mapping])) |
|
|
|
stream = sd.InputStream( |
|
device = args.device |
|
, channels = max(args.channels) |
|
, samplerate = args.samplerate |
|
, callback = callback |
|
) |
|
|
|
info = {"last_update": time.time(), "hue": iter(HueItr())} |
|
reference = resolve_reference(collector, args.light, lan_target) |
|
|
|
async with ATarget(lan_target) as afr: |
|
# Make sure our chosen device exists |
|
await reference.find(afr, lan_target.default_broadcast, 20) |
|
|
|
# Turn the lights on |
|
await lan_target.script(DeviceMessages.SetPower(level=65535)).run_with_all(reference, afr) |
|
|
|
async def update(data): |
|
hue = next(info["hue"]) |
|
max_amp = max(data)[0] * 500 |
|
|
|
if max_amp > 100: |
|
max_amp = 100 |
|
|
|
if max_amp < 1: |
|
max_amp = 1 |
|
|
|
msg = ColourMessages.SetColor(hue=next(info["hue"]), saturation=0.5, brightness=max_amp/100, res_required=False) |
|
await lan_target.script(msg).run_with_all(reference, afr, error_catcher=[]) |
|
|
|
with stream: |
|
while True: |
|
t, nxt = await queue.get() |
|
if info["last_update"] < t: |
|
nxt_update = info["last_update"] + 0.1 |
|
if t < nxt_update: |
|
loop.call_later(nxt_update - time.time(), queue.put_nowait, (t, nxt)) |
|
else: |
|
info["last_update"] = time.time() |
|
await update(nxt) |
|
|
|
if __name__ == "__main__": |
|
main() |