Last active
January 25, 2021 11:58
-
-
Save rbn42/30767cb36741b9ccbcc661b0841eda8f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SoundCardSource: | |
def __init__(self, channel_count, sample_rate, device_id, fps): | |
self.channel_count = channel_count | |
self.sample_rate = sample_rate | |
self.device_id = device_id | |
self.blocksize = self.sample_rate // fps | |
self.start() | |
def read(self, fps=None): | |
if fps is None: | |
b = self.blocksize | |
else: | |
b = self.sample_rate // fps | |
data = [stream.record(b) for stream in self.streams] | |
data = sum(data) / len(data) | |
return data | |
def start(self): | |
import soundcard as sc | |
try: | |
sc.set_name('Panon') | |
except (AttributeError, NotImplementedError): | |
pass | |
if self.device_id == 'all': | |
mics = sc.all_microphones(exclude_monitors=False) | |
elif self.device_id == 'allspeakers': | |
mics = [mic for mic in sc.all_microphones(exclude_monitors=False) if mic.id.endswith('.monitor')] | |
elif self.device_id == 'allmicrophones': | |
mics = [mic for mic in sc.all_microphones(exclude_monitors=True)] | |
elif self.device_id == 'default': | |
mics = [sc.default_microphone()] | |
else: | |
mics = [sc.get_microphone( | |
self.device_id, | |
include_loopback=False, | |
exclude_monitors=False, | |
)] | |
self.streams = [] | |
for mic in mics: | |
stream = mic.recorder( | |
self.sample_rate, | |
self.channel_count, | |
) | |
stream.__enter__() | |
self.streams.append(stream) | |
class PyaudioSource: | |
def __init__(self, channel_count, sample_rate, device_index, fps): | |
self.channel_count = channel_count | |
self.sample_rate = sample_rate | |
self.chunk = self.sample_rate // fps | |
if device_index is not None: | |
device_index = int(device_index) | |
self.device_index = device_index | |
self.start() | |
def read(self, fps=None): | |
# Ignores PyAudio exception on overflow. | |
if fps is None: | |
c = self.chunk | |
else: | |
c = self.sample_rate // fps | |
result = self.stream.read(c, exception_on_overflow=False) | |
return result | |
def start(self): | |
import pyaudio | |
p = pyaudio.PyAudio() | |
self.stream = p.open( | |
format=pyaudio.paInt16, | |
channels=self.channel_count, | |
rate=self.sample_rate, | |
input=True, | |
input_device_index=self.device_index, | |
) | |
import asyncio | |
import numpy as np | |
ps=SoundCardSource(2,44100,'alsa_output.pci-0000_00_1b.0.analog-stereo.monitor',30) | |
async def timeout_callback(): | |
await asyncio.sleep(1.0/100) | |
data=ps.read() | |
n=np.sum(np.abs(data)) #(sum([i for i in data])) | |
#print(n) | |
if n>0: | |
print('\rSpotify is playing ',end='') | |
else: | |
print('\rSpotify is not playing',end='') | |
async def main(): | |
while True: | |
await asyncio.sleep(1.0/30) | |
asyncio.ensure_future(timeout_callback()) | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
loop.run_until_complete(main()) | |
finally: | |
loop.run_until_complete(loop.shutdown_asyncgens()) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment