Skip to content

Instantly share code, notes, and snippets.

@tgarc
Last active December 1, 2016 06:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tgarc/5ffbc84f27b42e1aa357c8875cd83524 to your computer and use it in GitHub Desktop.
Save tgarc/5ffbc84f27b42e1aa357c8875cd83524 to your computer and use it in GitHub Desktop.
daemon for portaudio
#!/usr/bin/env python2
"""
Daemon for full duplex streaming with PyAudio
Note that recordings are always delayed by some amount and are therefore longer than the input file
Builds off of mpex.py: https://gist.github.com/tgarc/7120c00eb3eb5f736be93c70ade1d68a
"""
import multiprocessing, threading, Queue
import time
import sys
import urllib2
import pyaudio as pa
import fnmatch
import numpy as np
import traceback
from cStringIO import StringIO
IPCMAXBLOCKFSZ = 4096 # a size that is guaranteed to be >= the IO block size
QSIZE = 8 # queue size needs to vary depending on
# the ratio of IPC block size to IO block
# size
DEBUG = 1
VERBOSE = 1
class PortAudio(pa.PyAudio):
@classmethod
def get_callback_status_strings(cls, status):
return [v for bm,v in cls._cbstatus_bitmap.iteritems() if status&bm]
def query(self, hostApi, devname):
for i in range(self.get_host_api_count()):
api = self.get_host_api_info_by_index(i)
if api['name'].lower() == hostApi.lower():
hostApi = api['index']
break
else:
raise IOError("Host API %s not found." % hostApi)
qname = devname.lower()
for i in range(self.get_device_count()):
dev = self.get_device_info_by_index(i)
dname = dev['name'].lower()
if dev['hostApi'] == hostApi and (fnmatch.fnmatch(dname, qname) or dname.startswith(qname)):
return dev
raise IOError("Match not found for device name expression %r." % devname)
def list_streams(self):
for i in range(self.get_device_count()):
dev = self.get_device_info_by_index(i)
api = self.get_host_api_info_by_index(dev['hostApi'])
print '%s (%s): %d in, %d out' % (dev['name'],
api['name'],
dev['maxInputChannels'],
dev['maxOutputChannels'])
class PyAudioProcess(multiprocessing.Process):
def __init__(self, pipe, device, host_api, rate, channels, pastreamfmt, maxframeblocksize, buffsize=10, timeout=1, **kwargs):
super(PyAudioProcess, self).__init__(target=self.main, args=(pipe, device, host_api, rate, channels, pastreamfmt, maxframeblocksize, buffsize), **kwargs)
self.exit = multiprocessing.Event()
self.full = multiprocessing.Condition()
self.ready = multiprocessing.Event()
self.timeout = timeout
self.blocksize = pa.pa.get_sample_size(pastreamfmt)*maxframeblocksize*channels
self.count = 0
self.wridx = 0
self.rdidx = 0
def main(self, pipe, device, host_api, rate, channels, pastreamfmt, maxframeblocksize, buffsize=10):
# initialize portaudio and find device
painst = PortAudio()
padev = painst.query(host_api, device)
inpbuff = bytearray(self.blocksize*buffsize)
self.dbglog = StringIO()
# Wait for master to ready
while not self.ready.is_set():
if self.exit.is_set(): break
time.sleep(0.1)
# main loop
while not self.exit.is_set():
print >> self.dbglog, "READY!\n------"
self.ready.clear()
# we *must* write to the inp buffer before doing anything or the
# buffer checking logic will breakdown
for i in range(4):
self.full.acquire(); self.full.notify(); self.full.release()
self.wridx += pipe.recv_bytes_into(inpbuff, self.wridx)
# do the stream thing
try:
self.playrec(pipe, inpbuff, painst, padev['index'], rate, channels, pastreamfmt, timeout=self.timeout)
except KeyboardInterrupt:
break
# wait for master to ready again
while not self.ready.is_set():
if self.exit.is_set(): break
time.sleep(0.1)
# cleanup
self.count = self.wridx = self.rdidx = 0
with open('padaemon.log', 'w') as dbglog:
print >> dbglog, self.dbglog.getvalue()
pipe.close()
painst.terminate()
def _read(self, inpbuff, blocksize):
wridx = self.wridx
# read logic is a bit more complex than the write since we don't
# necessarily know the required chunk size beforehand, so we may be
# required to read around the edges of the receive buffer
nextidx = self.rdidx+blocksize
# we assume that once the read index catches up to the write
# index the stream is finished (this is technically an underflow
# condition, but we just assume underflow will never happen)
if self.rdidx <= wridx < nextidx:
nextidx = wridx
if nextidx >= len(inpbuff): # corner case: wrap around read
nextidx %= len(inpbuff)
if wridx < nextidx: # we've reached the write index
nextidx = wridx
indata = inpbuff[self.rdidx:]
if nextidx > 0: indata += inpbuff[:nextidx]
else:
indata = inpbuff[self.rdidx:nextidx]
self.rdidx = nextidx
return indata
def _receive(self, pipe, inpbuff):
# no more rx buffer space; wait for processing
if self.wridx <= self.rdidx <= self.wridx+self.blocksize:
return -1
# Logic is a bit simplified here since we can guarantee buffer
# size is a multiple of the transmission size (except for the
# last chunk which will be smaller)
self.full.acquire()
self.full.notify()
self.full.release()
nbytes = pipe.recv_bytes_into(inpbuff, self.wridx)
self.wridx = (self.wridx+nbytes) % len(inpbuff)
if DEBUG: print >> self.dbglog, "RX:", self.count, self.wridx, self.rdidx, abs(self.wridx-self.rdidx)
return nbytes
def playrec(self, pipe, inpbuff, painst, devidx, rate, channels, pastreamfmt, timeout=1):
q = Queue.Queue(QSIZE)
itemsize = pa.pa.get_sample_size(pastreamfmt)
framesize = channels*itemsize
self.last_time = 0
def pamincallback(in_data, frame_count, time_info, status):
pcsr = pa.paContinue
if DEBUG:
print >> self.dbglog, self.count, self.wridx, self.rdidx, abs(self.wridx-self.rdidx), q.qsize(), 1000*(time_info['current_time'] - self.last_time)
self.last_time = time_info['current_time']
# send off input data
try:
q.put((status, in_data), block=False)
except Queue.Full:
traceback.print_exc()
q.queue.clear(); q.put(None)
return '', pa.paAbort
# read in the next block of frames
txbuff = self._read(inpbuff, frame_count*framesize)
out_data = np.frombuffer(txbuff, dtype='int32')
# This is our last callback!
if len(out_data) < frame_count*channels:
if DEBUG:
print >> self.dbglog, self.count, self.wridx, self.rdidx, abs(self.wridx-self.rdidx), q.qsize(), time_info['current_time']
print >> self.dbglog, "----\nDone"
pcsr = pa.paComplete
q.put(None)
self.count += frame_count
return (out_data, pcsr)
stream = painst.open(rate=rate, channels=channels,
format=pastreamfmt, input=True, output=True,
frames_per_buffer=0, # Let backend decide buffersize
input_device_index=devidx,
output_device_index=devidx,
stream_callback=pamincallback,
start=False)
try:
stream.start_stream()
while stream.is_active() and not self.exit.is_set():
item = q.get(timeout=timeout)
pipe.send(item)
if item is None: break
nbytes = self._receive(pipe, inpbuff)
if nbytes == -1: # buffer's full right now
continue
elif nbytes < self.blocksize: # we must be done
while True:
item = q.get()
pipe.send(item)
if item is None: break
break
finally:
stream.close()
# @classmethod
# def get_streamer(cls, *args, **kwargs):
# hostpipe, clientpipe = multiprocessing.Pipe()
# pap = cls(clientpipe, *args, **kwargs)
# pap.start(); clientpipe.close()
# yield functools.partial(pap.stream, hostpipe)
# pap.exit.set(); pap.join(); hostpipe.close()
def stream(self, pipe, reader=None, writer=None):
assert self.is_alive(), "You need to start a PyAudioProcess before you can start streaming!"
def pipewriter(self, pipe, reader, fill):
self.full.acquire()
while True:
self.full.wait()
if reader is None: outdata = fill
else: outdata = reader(self.blocksize)
if len(outdata): pipe.send_bytes(outdata)
if len(outdata) < self.blocksize: break
zeros = '\x00'*self.blocksize
prp = threading.Thread(target=pipewriter, args=(self, pipe, reader), kwargs={'fill': zeros})
prp.daemon = True
prp.start()
self.ready.set()
while not self.exit.is_set():
if pipe.poll():
try: item = pipe.recv()
except EOFError: done=True; break
if item is None: done=True; break
stat, indata = item
if stat != 0:
raise multiprocessing.ProcessError("PortAudioError: { %s }" % ', '.join(PortAudio.get_callback_status_strings(stat)))
if writer is not None: writer(indata)
def main(argv=None):
if argv is None: argv=sys.argv[1:]
if argv[0] == '-l':
PortAudio().list_streams()
sys.exit(0)
inpf, outf = argv[:2]
try:
if argv[2] == '--loop':
loop = True
except IndexError:
loop = False
if inpf == '-':
inp_fh = sys.stdin
elif inpf == 'null':
inp_fh = None
elif inpf.startswith('http'):
inp_fh = urllib2.urlopen(inpf)
else:
inp_fh = open(inpf, 'rb')
if outf == '-':
out_fh = sys.stdout
elif outf == 'null':
out_fh = None
else:
out_fh = open(outf, 'wb+')
# define the device stream here
# dev, api, rate, channels, streamfmt = '*ALC3232*', 'alsa', 48000, 2, pa.paInt32
dev, api, rate, channels, streamfmt = 'mini', 'asio', 48000, 8, pa.paInt32
hostpipe, clientpipe = multiprocessing.Pipe()
pap = PyAudioProcess(clientpipe, dev, api, rate, channels, streamfmt, IPCMAXBLOCKFSZ, timeout=1, name='pyaudio')
statline = "{{{:^9d}}} {:6.3f}s {:>10s} bytes sent, {:>10s} bytes received\r"
pap.start()
clientpipe.close()
printout = out_fh != sys.stdout and VERBOSE
try:
while pap.is_alive():
thread = threading.Thread(target=pap.stream, args=(hostpipe, inp_fh.read if inp_fh else None, out_fh.write if out_fh else None), name='streamer')
t1 = time.time()
i = 0
thread.start()
while thread.is_alive():
time.sleep(0.01)
if printout:
sys.stdout.write(statline.format(i, time.time()-t1, str(inp_fh.tell()) if inp_fh else '-', str(out_fh.tell()) if out_fh else '-'))
i += 1
if not loop: break
if printout: print
inp_fh.seek(0)
except KeyboardInterrupt:
if DEBUG: raise
except:
pap.terminate()
raise
finally:
if printout: print
pap.exit.set()
hostpipe.close()
if out_fh is not None: out_fh.close()
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment