Skip to content

Instantly share code, notes, and snippets.

@rbn42
Last active January 25, 2021 11:58
Show Gist options
  • Save rbn42/30767cb36741b9ccbcc661b0841eda8f to your computer and use it in GitHub Desktop.
Save rbn42/30767cb36741b9ccbcc661b0841eda8f to your computer and use it in GitHub Desktop.
class SoundCardSource:
def __init__(self, channel_count, sample_rate, device_id, fps):
self.channel_count = channel_count
self.sample_rate = sample_rate
self.device_id = device_id
self.blocksize = self.sample_rate // fps
self.start()
def read(self, fps=None):
if fps is None:
b = self.blocksize
else:
b = self.sample_rate // fps
data = [stream.record(b) for stream in self.streams]
data = sum(data) / len(data)
return data
def start(self):
import soundcard as sc
try:
sc.set_name('Panon')
except (AttributeError, NotImplementedError):
pass
if self.device_id == 'all':
mics = sc.all_microphones(exclude_monitors=False)
elif self.device_id == 'allspeakers':
mics = [mic for mic in sc.all_microphones(exclude_monitors=False) if mic.id.endswith('.monitor')]
elif self.device_id == 'allmicrophones':
mics = [mic for mic in sc.all_microphones(exclude_monitors=True)]
elif self.device_id == 'default':
mics = [sc.default_microphone()]
else:
mics = [sc.get_microphone(
self.device_id,
include_loopback=False,
exclude_monitors=False,
)]
self.streams = []
for mic in mics:
stream = mic.recorder(
self.sample_rate,
self.channel_count,
)
stream.__enter__()
self.streams.append(stream)
class PyaudioSource:
def __init__(self, channel_count, sample_rate, device_index, fps):
self.channel_count = channel_count
self.sample_rate = sample_rate
self.chunk = self.sample_rate // fps
if device_index is not None:
device_index = int(device_index)
self.device_index = device_index
self.start()
def read(self, fps=None):
# Ignores PyAudio exception on overflow.
if fps is None:
c = self.chunk
else:
c = self.sample_rate // fps
result = self.stream.read(c, exception_on_overflow=False)
return result
def start(self):
import pyaudio
p = pyaudio.PyAudio()
self.stream = p.open(
format=pyaudio.paInt16,
channels=self.channel_count,
rate=self.sample_rate,
input=True,
input_device_index=self.device_index,
)
import asyncio
import numpy as np
ps=SoundCardSource(2,44100,'alsa_output.pci-0000_00_1b.0.analog-stereo.monitor',30)
async def timeout_callback():
await asyncio.sleep(1.0/100)
data=ps.read()
n=np.sum(np.abs(data)) #(sum([i for i in data]))
#print(n)
if n>0:
print('\rSpotify is playing ',end='')
else:
print('\rSpotify is not playing',end='')
async def main():
while True:
await asyncio.sleep(1.0/30)
asyncio.ensure_future(timeout_callback())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment