Created
September 2, 2017 14:30
-
-
Save devilholk/96f1e30c748fcc5b6e50882fa2f79e56 to your computer and use it in GitHub Desktop.
Using python and numpy to filter a pulse audio stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy, subprocess, struct, sys | |
#pacmd load-module module-null-sink sink_name=Python_input | |
#pacmd update-sink-proplist Python_input device.description="Python sound processing input" | |
#Note - seems we can't have spaces in description so replace with _ or something for now | |
# - it might be a bash thing so it might work from subprocess.call, have not tested | |
#figure out what monitor-stream this is and put it down there where we pamon | |
#also remember to redirect the source application to this sink (for instance pavucontrol) | |
#There is quite a delay now but what I have not tested is to | |
#add flags to pacat and paman to use way smaller buffers which should help | |
#but we will still not get away from having to wait for the FFT sample window | |
buf_size = 512 | |
freq = numpy.fft.rfftfreq(buf_size, 1.0/44100.00) | |
#Notch filter | |
#notch = 5000.0 | |
#notch_w = 1500.0 | |
#fft_mul = numpy.fromiter((min((f - notch)**2 / notch_w**2, 1.0) for f in freq), dtype=float) | |
#Bandpass filter | |
# bp_freq = 7000 | |
# bp_width = 15000 | |
# fft_mul = numpy.fromiter((1.0-min(((f - bp_freq)**2 / bp_width**2), 1.0) for f in freq), dtype=float) | |
#Freq boost (will distort) | |
# b_freq = 200 | |
# b_width = 3000 | |
# b_gain = 0.5 | |
# fft_mul = numpy.fromiter((1.0+b_gain-min(((f - b_freq)**2 / b_width**2), b_gain) for f in freq), dtype=float) | |
sgn = lambda x: -1 if x < 0 else 1 | |
#Lowpass filter - bass boost 1 | |
# lp_freq = 100 | |
# lp_width = 5000 | |
# lp_min = 0.5 | |
# fft_mul = numpy.fromiter((1.0-max(min(sgn(f-lp_freq)* ((f - lp_freq)**2 / lp_width**2), lp_min), 0.0) for f in freq), dtype=float) | |
#Lowpass filter - bass boost 2 | |
lp_freq = 0 | |
lp_width = 2000 | |
lp_min = 0.3 | |
fft_mul = numpy.fromiter((1.0-max(min(sgn(f-lp_freq)* ((f - lp_freq)**2 / lp_width**2), lp_min), 0.0) for f in freq), dtype=float) | |
print(fft_mul) | |
#sys.exit() | |
proc = subprocess.Popen(('pamon', '--monitor-stream', '1',), stdout=subprocess.PIPE) | |
proc2 = subprocess.Popen(('pacat',), stdin=subprocess.PIPE) | |
while 1: | |
buf1 = numpy.array(numpy.fromstring(proc.stdout.read(buf_size*4), dtype=numpy.int16), dtype=float) | |
buf1 /= 32767.0 | |
buf2_l = buf1[::2].copy() | |
buf2_r = buf1[1::2].copy() | |
fft1_l, fft1_r = numpy.fft.rfft(buf2_l), numpy.fft.rfft(buf2_r) | |
fft1_l *= fft_mul | |
fft1_r *= fft_mul | |
syn1_l, syn1_r = numpy.real(numpy.fft.irfft(fft1_l)), numpy.real(numpy.fft.irfft(fft1_r)) | |
syn2 = numpy.vstack((syn1_l, syn1_r)).reshape((-1,), order='F') | |
syn2 *= 32767.0 | |
syn3 = numpy.array(syn2, dtype=numpy.int16) | |
#print(syn3.tobytes()) | |
proc2.stdin.write(syn3.tobytes()) | |
proc.kill() | |
proc2.kill() | |
proc.wait() | |
proc2.wait() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment