Skip to content

Instantly share code, notes, and snippets.

@bierbo
Forked from remram44/pulse_record.py
Created December 21, 2021 15:41
Show Gist options
  • Save bierbo/b223b0bab1b037ed2778278b376f16ff to your computer and use it in GitHub Desktop.
Save bierbo/b223b0bab1b037ed2778278b376f16ff to your computer and use it in GitHub Desktop.
import pulsectl # pip install pulsectl==18.12.5
import time
# (other apps) -----------------------+
# |
# v
# microphone --> call --> pr-out -- mon --> loopback --> speakers
# | |
# | v
# | loopback
# | |
# v v
# loopback -> pr-rec -- mon --> recorder
def main():
with pulsectl.Pulse('pulse-record') as pulse:
cleanup = []
try:
# Locate default source/sink
mic = pulse.server_info().default_source_name
mic, = [source
for source in pulse.source_list()
if source.name == mic]
speakers = pulse.server_info().default_sink_name
speakers, = [sink
for sink in pulse.sink_list()
if sink.name == speakers]
# Build node for call output
output_mod = pulse.module_load('module-null-sink', args='sink_properties=device.description=pr-out')
cleanup.append(output_mod)
time.sleep(0.5)
output, = [sink
for sink in pulse.sink_list()
if sink.owner_module == output_mod]
output_monitor, = [source
for source in pulse.source_list()
if source.name == output.monitor_source_name]
# Build node for call output + mic, for recording
record_mod = pulse.module_load('module-null-sink', args='sink_properties=device.description=pr-rec')
cleanup.append(record_mod)
time.sleep(0.5)
record, = [sink
for sink in pulse.sink_list()
if sink.owner_module == record_mod]
record_monitor, = [source
for source in pulse.source_list()
if source.name == record.monitor_source_name]
# Send call output to speakers
lb = pulse.module_load(
'module-loopback',
'source=%s sink=%s latency_msec=1' % (output_monitor.name, speakers.name),
)
cleanup.append(lb)
# Restore defaults
pulse.sink_default_set(speakers)
pulse.source_default_set(mic)
# Send mic and call output to recording node
lb = pulse.module_load(
'module-loopback',
'source=%s sink=%s latency_msec=1' % (mic.name, record.name),
)
cleanup.append(lb)
lb = pulse.module_load(
'module-loopback',
'source=%s sink=%s latency_msec=1' % (output_monitor.name, record.name),
)
cleanup.append(lb)
print("Devices created")
# Now we wait for call to show up, to connect it to 'output'
# Also wait for audacity, to connect it to 'record_monitor'
audacity_found = False
calls = set()
while True:
if not audacity_found:
# Find audacity, route from 'record_monitor', set audacity_found
for rec in pulse.source_output_list():
app = rec.proplist.get('application.name')
if app and 'audacity' in app:
print("Audacity found, wiring it up: %s" % app)
pulse.source_output_move(rec.index, record_monitor.index)
audacity_found = True
# Find calls, route to 'output'
for out in pulse.sink_input_list():
if out.index in calls:
continue
app = out.proplist.get('application.name')
if app in (
'WEBRTC VoiceEngine', # Zoom
'Chrome', 'Chromium', 'Google Chrome',
'Skype',
):
print("Call identified, recording: %s" % app)
pulse.sink_input_move(out.index, output.index)
calls.add(out.index)
# Remove terminated calls from list
calls.intersection_update(
out.index
for out in pulse.sink_input_list()
)
time.sleep(2)
except KeyboardInterrupt:
pass
finally:
print("Cleaning up")
for mod in reversed(cleanup):
pulse.module_unload(mod)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment