Skip to content

Instantly share code, notes, and snippets.

@acedrew
Created December 20, 2023 16:07
Show Gist options
  • Save acedrew/786b151d3d32998079a953ea138d6e38 to your computer and use it in GitHub Desktop.
Save acedrew/786b151d3d32998079a953ea138d6e38 to your computer and use it in GitHub Desktop.
Streaming Audio Devices to an ESP32
import sounddevice as sd
import numpy as np
import requests
from monitorcontrol import get_monitors
from multiprocessing import Process
import socket
# seconds
MODULUS = 5
last_in_values = [0.0 for x in range(MODULUS)]
last_out_values = [0.0 for x in range(MODULUS)]
i = 0
devices = sd.query_devices()
# [print(device['name'], device['hostapi']) for device in devices]
APIS = sd.query_hostapis()
MME_API, = [i for i, api in enumerate(APIS) if api['name'] == 'MME']
LOOPBACK_DEVICE, = [i for i, device in enumerate(devices) if device['name'] == 'In 1-2 (MOTU M Series)' and device['hostapi'] == MME_API]
# for monitor in get_monitors():
# with monitor:
# capabilities = monitor.get_vcp_capabilities()
# if capabilities.get('model') == 'VA27DQSB':
# monitor.set_input_source('DP1')
def callback_in(indata, frames, time, status):
global last_in_values
global i
# print(dir(time))
in_value = abs(np.max(indata) - np.min(indata))
# last_in_values.pop()
# last_in_values = [in_value] + last_in_values
last_in_values[i%MODULUS] = in_value
i = i + 1
# print(r.json()['revision'])
if status:
print(status)
def callback_out(outdata, frames, time, status):
global last_out_values
global i
# print(dir(time))
out_value = abs(np.max(outdata) - np.min(outdata))
last_out_values[i%MODULUS] = out_value
i = i + 1
# print(r.json()['revision'])
if status:
print(status)
def run_input():
with sd.InputStream(channels=1, latency='low', callback=callback_in):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
while True:
sd.sleep(int(10))
sock.sendto(bytes([min(255, int(20 + (np.mean(last_in_values)*255)))]), ("192.168.9.139", 9999))
sock.sendto(bytes([min(255, int((((1.2 + np.mean(last_in_values))**7.2) * 20)))]), ("192.168.9.138", 9999))
def run_output():
with sd.InputStream(channels=1, latency='low', callback=callback_out, device=LOOPBACK_DEVICE):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
while True:
sd.sleep(int(10))
sock.sendto(bytes([min(255, int(5 + (((1.1 + np.mean(last_out_values))**3.2) * 5)))]), ("192.168.9.141", 9998))
if __name__ == '__main__':
p1 = Process(target=run_input)
p2 = Process(target=run_output)
p1.start()
p2.start()
p1.join()
p2.join()
# with sd.InputStream(channels=1, latency='low', callback=callback_in):
# with sd.InputStream(channels=1, latency='low', callback=callback_out, device=LOOPBACK_DEVICE):
# with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
# while True:
# sd.sleep(int(10))
# # weighted_values = [(MODULUS - i) * value * (1/MODULUS) for i, value in enumerate(last_in_values)]
# sock.sendto(bytes([min(255, int(20 + (np.mean(last_in_values)*255)))]), ("192.168.9.139", 9999))
# sock.sendto(bytes([min(255, int((((1.2 + np.mean(last_in_values))**7.2) * 20)))]), ("192.168.9.138", 9999))
# # sock.sendto(bytes([min(255, int((((1.2 + np.mean(weighted_values))**7.2) * 20)))]), ("192.168.9.138", 9999))
# # sock.sendto(bytes([min(255, int(20 + (np.mean(last_in_values)*355)))]), ("192.168.9.140", 9999))
# sock.sendto(bytes([min(255, int(5 + (((1.1 + np.mean(last_out_values))**3.2) * 5)))]), ("192.168.9.141", 9998))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment