Skip to content

Instantly share code, notes, and snippets.

@michael-lazar
Created September 18, 2017 19:58
Show Gist options
  • Save michael-lazar/dfaceb156af013c8dcfa31c60fc81ab9 to your computer and use it in GitHub Desktop.
Save michael-lazar/dfaceb156af013c8dcfa31c60fc81ab9 to your computer and use it in GitHub Desktop.
"""
Synchronize Hue lights to music recorded in real-time from an external device.
"""
import warnings
import logging
import time
from threading import Thread
import phue
import sounddevice as sd
import numpy as np
try:
import matplotlib.pyplot as plt
except ImportError:
pass
logging.basicConfig(level=logging.DEBUG)
_logger = logging.getLogger('sync_music')
HUE_IP = '192.168.0.13'
HUE_USERNAME = 'xfX978T2lfBdfcOdZ8scFmxFr3Az7ZsVUVCxvscY'
LIGHTS = [
{'name': 'Backyard', 'range': [0, 200], 'amp': [10, 50]},
# {'name': 'Living Room 2', 'range': [2000, 4000], 'amp': [-20, 30]},
{'name': 'Living Room 2', 'range': [0, 2000], 'amp': [10, 50]},
]
BLOCK_DURATION = 100 # Millseconds, 100ms is the fastest recommended for hue
FFT_SIZE = 4096
SAMPLERATE = 8000
#SAMPLERATE = sd.query_devices(sd.default.device, 'input')['default_samplerate']
_logger.info('Samplerate %s', SAMPLERATE)
_logger.info('Block duration %s ms', BLOCK_DURATION)
_logger.info('FFT size %s', FFT_SIZE)
for light in LIGHTS:
assert light['range'][0] >= 0, 'Invalid range'
assert light['range'][1] <= SAMPLERATE / 2, 'Invalid range'
b = phue.Bridge(HUE_IP, HUE_USERNAME)
hue_lights = b.get_light_objects('name')
_logger.info('Starting stream')
blocksize = int(BLOCK_DURATION * SAMPLERATE / 1000)
stream = sd.InputStream(samplerate=SAMPLERATE, blocksize=blocksize,
device=sd.default.device, channels=1, latency='low')
stream.start()
def swap_colors():
while True:
try:
b.set_light('Backyard', {'xy': [0.591, 0.3833], 'transitiontime': 50})
b.set_light('Living Room 2', {'xy': [0.2642, 0.1051], 'transitiontime': 50})
time.sleep(10)
b.set_light('Living Room 2', {'xy': [0.591, 0.3833], 'transitiontime': 50})
b.set_light('Backyard', {'xy': [0.2642, 0.1051], 'transitiontime': 50})
time.sleep(10)
except Exception:
pass
thread = Thread(target=swap_colors)
thread.start()
try:
while True:
try:
data, overflowed = stream.read(blocksize)
_logger.debug('Read %s samples', data.shape[0])
if overflowed:
warnings.warn('Buffer overflow')
magnitude = np.abs(np.fft.rfft(data[:, 0], n=FFT_SIZE))
for light in LIGHTS:
min_i = int(light['range'][0] / SAMPLERATE * FFT_SIZE)
max_i = int(light['range'][1] / SAMPLERATE * FFT_SIZE)
brightness = np.mean(magnitude[min_i:max_i])
brightness = 10 * np.log(brightness) # Convert to decibel
_logger.debug('Light %s mag %s', light['name'], brightness)
brightness = ((brightness - light['amp'][0]) / # Normalize to [0, 1]
(light['amp'][1] - light['amp'][0]))
brightness += 0.6
brightness = min(max(brightness, 0), 1) # Cut off out of bounds
brightness = int((brightness * 253) + 1) # Transform to [1, 254]
hue_light = hue_lights[light['name']]
hue_light.transitiontime = 1 # deciseconds
hue_light.brightness = brightness
# fs = np.linspace(0, SAMPLERATE / 2, FFT_SIZE / 2 + 1)
# plt.plot(fs, magnitude)
# plt.axvline(light['range'][0], lw=2, color='r')
# plt.axvline(light['range'][1], lw=2, color='r')
# plt.show()
except Exception:
pass
except KeyboardInterrupt:
_logger.info('Stopping stream')
stream.stop()
stream.close()
_logger.info('Done!')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment