Skip to content

Instantly share code, notes, and snippets.

@devilholk
Created September 2, 2017 14:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devilholk/96f1e30c748fcc5b6e50882fa2f79e56 to your computer and use it in GitHub Desktop.
Save devilholk/96f1e30c748fcc5b6e50882fa2f79e56 to your computer and use it in GitHub Desktop.
Using python and numpy to filter a pulse audio stream
import numpy, subprocess, struct, sys
#pacmd load-module module-null-sink sink_name=Python_input
#pacmd update-sink-proplist Python_input device.description="Python sound processing input"
#Note - seems we can't have spaces in description so replace with _ or something for now
# - it might be a bash thing so it might work from subprocess.call, have not tested
#figure out what monitor-stream this is and put it down there where we pamon
#also remember to redirect the source application to this sink (for instance pavucontrol)
#There is quite a delay now but what I have not tested is to
#add flags to pacat and paman to use way smaller buffers which should help
#but we will still not get away from having to wait for the FFT sample window
buf_size = 512
freq = numpy.fft.rfftfreq(buf_size, 1.0/44100.00)
#Notch filter
#notch = 5000.0
#notch_w = 1500.0
#fft_mul = numpy.fromiter((min((f - notch)**2 / notch_w**2, 1.0) for f in freq), dtype=float)
#Bandpass filter
# bp_freq = 7000
# bp_width = 15000
# fft_mul = numpy.fromiter((1.0-min(((f - bp_freq)**2 / bp_width**2), 1.0) for f in freq), dtype=float)
#Freq boost (will distort)
# b_freq = 200
# b_width = 3000
# b_gain = 0.5
# fft_mul = numpy.fromiter((1.0+b_gain-min(((f - b_freq)**2 / b_width**2), b_gain) for f in freq), dtype=float)
sgn = lambda x: -1 if x < 0 else 1
#Lowpass filter - bass boost 1
# lp_freq = 100
# lp_width = 5000
# lp_min = 0.5
# fft_mul = numpy.fromiter((1.0-max(min(sgn(f-lp_freq)* ((f - lp_freq)**2 / lp_width**2), lp_min), 0.0) for f in freq), dtype=float)
#Lowpass filter - bass boost 2
lp_freq = 0
lp_width = 2000
lp_min = 0.3
fft_mul = numpy.fromiter((1.0-max(min(sgn(f-lp_freq)* ((f - lp_freq)**2 / lp_width**2), lp_min), 0.0) for f in freq), dtype=float)
print(fft_mul)
#sys.exit()
proc = subprocess.Popen(('pamon', '--monitor-stream', '1',), stdout=subprocess.PIPE)
proc2 = subprocess.Popen(('pacat',), stdin=subprocess.PIPE)
while 1:
buf1 = numpy.array(numpy.fromstring(proc.stdout.read(buf_size*4), dtype=numpy.int16), dtype=float)
buf1 /= 32767.0
buf2_l = buf1[::2].copy()
buf2_r = buf1[1::2].copy()
fft1_l, fft1_r = numpy.fft.rfft(buf2_l), numpy.fft.rfft(buf2_r)
fft1_l *= fft_mul
fft1_r *= fft_mul
syn1_l, syn1_r = numpy.real(numpy.fft.irfft(fft1_l)), numpy.real(numpy.fft.irfft(fft1_r))
syn2 = numpy.vstack((syn1_l, syn1_r)).reshape((-1,), order='F')
syn2 *= 32767.0
syn3 = numpy.array(syn2, dtype=numpy.int16)
#print(syn3.tobytes())
proc2.stdin.write(syn3.tobytes())
proc.kill()
proc2.kill()
proc.wait()
proc2.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment