-
-
Save bierbo/b223b0bab1b037ed2778278b376f16ff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pulsectl # pip install pulsectl==18.12.5 | |
import time | |
# (other apps) -----------------------+ | |
# | | |
# v | |
# microphone --> call --> pr-out -- mon --> loopback --> speakers | |
# | | | |
# | v | |
# | loopback | |
# | | | |
# v v | |
# loopback -> pr-rec -- mon --> recorder | |
def main(): | |
with pulsectl.Pulse('pulse-record') as pulse: | |
cleanup = [] | |
try: | |
# Locate default source/sink | |
mic = pulse.server_info().default_source_name | |
mic, = [source | |
for source in pulse.source_list() | |
if source.name == mic] | |
speakers = pulse.server_info().default_sink_name | |
speakers, = [sink | |
for sink in pulse.sink_list() | |
if sink.name == speakers] | |
# Build node for call output | |
output_mod = pulse.module_load('module-null-sink', args='sink_properties=device.description=pr-out') | |
cleanup.append(output_mod) | |
time.sleep(0.5) | |
output, = [sink | |
for sink in pulse.sink_list() | |
if sink.owner_module == output_mod] | |
output_monitor, = [source | |
for source in pulse.source_list() | |
if source.name == output.monitor_source_name] | |
# Build node for call output + mic, for recording | |
record_mod = pulse.module_load('module-null-sink', args='sink_properties=device.description=pr-rec') | |
cleanup.append(record_mod) | |
time.sleep(0.5) | |
record, = [sink | |
for sink in pulse.sink_list() | |
if sink.owner_module == record_mod] | |
record_monitor, = [source | |
for source in pulse.source_list() | |
if source.name == record.monitor_source_name] | |
# Send call output to speakers | |
lb = pulse.module_load( | |
'module-loopback', | |
'source=%s sink=%s latency_msec=1' % (output_monitor.name, speakers.name), | |
) | |
cleanup.append(lb) | |
# Restore defaults | |
pulse.sink_default_set(speakers) | |
pulse.source_default_set(mic) | |
# Send mic and call output to recording node | |
lb = pulse.module_load( | |
'module-loopback', | |
'source=%s sink=%s latency_msec=1' % (mic.name, record.name), | |
) | |
cleanup.append(lb) | |
lb = pulse.module_load( | |
'module-loopback', | |
'source=%s sink=%s latency_msec=1' % (output_monitor.name, record.name), | |
) | |
cleanup.append(lb) | |
print("Devices created") | |
# Now we wait for call to show up, to connect it to 'output' | |
# Also wait for audacity, to connect it to 'record_monitor' | |
audacity_found = False | |
calls = set() | |
while True: | |
if not audacity_found: | |
# Find audacity, route from 'record_monitor', set audacity_found | |
for rec in pulse.source_output_list(): | |
app = rec.proplist.get('application.name') | |
if app and 'audacity' in app: | |
print("Audacity found, wiring it up: %s" % app) | |
pulse.source_output_move(rec.index, record_monitor.index) | |
audacity_found = True | |
# Find calls, route to 'output' | |
for out in pulse.sink_input_list(): | |
if out.index in calls: | |
continue | |
app = out.proplist.get('application.name') | |
if app in ( | |
'WEBRTC VoiceEngine', # Zoom | |
'Chrome', 'Chromium', 'Google Chrome', | |
'Skype', | |
): | |
print("Call identified, recording: %s" % app) | |
pulse.sink_input_move(out.index, output.index) | |
calls.add(out.index) | |
# Remove terminated calls from list | |
calls.intersection_update( | |
out.index | |
for out in pulse.sink_input_list() | |
) | |
time.sleep(2) | |
except KeyboardInterrupt: | |
pass | |
finally: | |
print("Cleaning up") | |
for mod in reversed(cleanup): | |
pulse.module_unload(mod) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment