Created
December 20, 2023 16:07
-
-
Save acedrew/786b151d3d32998079a953ea138d6e38 to your computer and use it in GitHub Desktop.
Streaming Audio Devices to an ESP32
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sounddevice as sd | |
import numpy as np | |
import requests | |
from monitorcontrol import get_monitors | |
from multiprocessing import Process | |
import socket | |
# seconds | |
MODULUS = 5 | |
last_in_values = [0.0 for x in range(MODULUS)] | |
last_out_values = [0.0 for x in range(MODULUS)] | |
i = 0 | |
devices = sd.query_devices() | |
# [print(device['name'], device['hostapi']) for device in devices] | |
APIS = sd.query_hostapis() | |
MME_API, = [i for i, api in enumerate(APIS) if api['name'] == 'MME'] | |
LOOPBACK_DEVICE, = [i for i, device in enumerate(devices) if device['name'] == 'In 1-2 (MOTU M Series)' and device['hostapi'] == MME_API] | |
# for monitor in get_monitors(): | |
# with monitor: | |
# capabilities = monitor.get_vcp_capabilities() | |
# if capabilities.get('model') == 'VA27DQSB': | |
# monitor.set_input_source('DP1') | |
def callback_in(indata, frames, time, status): | |
global last_in_values | |
global i | |
# print(dir(time)) | |
in_value = abs(np.max(indata) - np.min(indata)) | |
# last_in_values.pop() | |
# last_in_values = [in_value] + last_in_values | |
last_in_values[i%MODULUS] = in_value | |
i = i + 1 | |
# print(r.json()['revision']) | |
if status: | |
print(status) | |
def callback_out(outdata, frames, time, status): | |
global last_out_values | |
global i | |
# print(dir(time)) | |
out_value = abs(np.max(outdata) - np.min(outdata)) | |
last_out_values[i%MODULUS] = out_value | |
i = i + 1 | |
# print(r.json()['revision']) | |
if status: | |
print(status) | |
def run_input(): | |
with sd.InputStream(channels=1, latency='low', callback=callback_in): | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | |
while True: | |
sd.sleep(int(10)) | |
sock.sendto(bytes([min(255, int(20 + (np.mean(last_in_values)*255)))]), ("192.168.9.139", 9999)) | |
sock.sendto(bytes([min(255, int((((1.2 + np.mean(last_in_values))**7.2) * 20)))]), ("192.168.9.138", 9999)) | |
def run_output(): | |
with sd.InputStream(channels=1, latency='low', callback=callback_out, device=LOOPBACK_DEVICE): | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | |
while True: | |
sd.sleep(int(10)) | |
sock.sendto(bytes([min(255, int(5 + (((1.1 + np.mean(last_out_values))**3.2) * 5)))]), ("192.168.9.141", 9998)) | |
if __name__ == '__main__': | |
p1 = Process(target=run_input) | |
p2 = Process(target=run_output) | |
p1.start() | |
p2.start() | |
p1.join() | |
p2.join() | |
# with sd.InputStream(channels=1, latency='low', callback=callback_in): | |
# with sd.InputStream(channels=1, latency='low', callback=callback_out, device=LOOPBACK_DEVICE): | |
# with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | |
# while True: | |
# sd.sleep(int(10)) | |
# # weighted_values = [(MODULUS - i) * value * (1/MODULUS) for i, value in enumerate(last_in_values)] | |
# sock.sendto(bytes([min(255, int(20 + (np.mean(last_in_values)*255)))]), ("192.168.9.139", 9999)) | |
# sock.sendto(bytes([min(255, int((((1.2 + np.mean(last_in_values))**7.2) * 20)))]), ("192.168.9.138", 9999)) | |
# # sock.sendto(bytes([min(255, int((((1.2 + np.mean(weighted_values))**7.2) * 20)))]), ("192.168.9.138", 9999)) | |
# # sock.sendto(bytes([min(255, int(20 + (np.mean(last_in_values)*355)))]), ("192.168.9.140", 9999)) | |
# sock.sendto(bytes([min(255, int(5 + (((1.1 + np.mean(last_out_values))**3.2) * 5)))]), ("192.168.9.141", 9998)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment