Skip to content

Instantly share code, notes, and snippets.

@tgarc
Last active February 5, 2017 21:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tgarc/3ee8c94743a57ee096a05f1985333782 to your computer and use it in GitHub Desktop.
Save tgarc/3ee8c94743a57ee096a05f1985333782 to your computer and use it in GitHub Desktop.
portaudio/libsndfile for half and full duplex audio streaming
# sets up a full duplex alsa loopback device
pcm.loophw00 {
type hw
card Loopback
device 0
subdevice 0
format S32_LE
rate 48000
channels 8
}
pcm.loophw01 {
type hw
card Loopback
device 1
subdevice 0
format S32_LE
rate 48000
channels 8
}
pcm.aduplex {
type asym
playback.pcm "loophw00"
capture.pcm "loophw01"
hint {
show on
description "Duplex Loopback"
}
}
#!/usr/bin/env python
"""
Uses the `soundfile <http://pysoundfile.readthedocs.io>`_ and `sounddevice
<http://python-sounddevice.readthedocs.io>`_ libraries to playback, record, or
simultaneously playback and record audio files.
Notes::
+ 24-bit streaming is currently not supported (typically 32-bit streaming gets
downconverted automatically anyway)
+ For simplicity, this app only supports 'symmetric' full duplex audio streams;
i.e., the input device and output device are assumed to be the same.
"""
from __future__ import print_function
try:
import Queue as queue
except ImportError:
import queue
try:
basestring
except NameError:
basestring = (str, bytes)
import threading
import time
import sys
import sounddevice as sd
import soundfile as sf
import traceback
TXQSIZE = 1<<16 # Number of frames to buffer for transmission
# TODO
# Replace Queue with a portaudio ring buffer in order to allow variable block
# sizes
class QueuedStreamBase(sd._StreamBase):
"""
This class adds a python Queue for reading and writing audio data. This
double buffers the audio data so that any processing is kept out of the time
sensitive audio callback function. For maximum flexibility, receive queue
data is a bytearray object; transmit queue data should be of a buffer type
where each element is a single byte.
Notes:
Using a queue requires that we use a fixed, predetermined value for the
portaudio blocksize.
If the receive buffer fills or the transmit buffer is found empty during a
callback the audio stream is aborted and an exception is raised.
During recording, the end of the stream is signaled by a 'None' value placed
into the receive queue.
During playback, the end of the stream is signaled by an item on the queue
that is smaller than blocksize*channels*samplesize bytes.
"""
def __init__(self, blocksize=None, qsize=-1, kind='duplex', **kwargs):
if kwargs.get('callback', None) is None:
if kind == 'input':
kwargs['callback'] = self.icallback
elif kind == 'output':
kwargs['callback'] = self.ocallback
else:
kwargs['callback'] = self.iocallback
if (kind == 'output' or kind == 'duplex') and not blocksize:
raise ValueError("Non-zero blocksize is required for playback mode.")
kwargs['wrap_callback'] = 'buffer'
super(QueuedStreamBase, self).__init__(kind=kind, blocksize=blocksize, **kwargs)
self.status = sd.CallbackFlags()
self.frame_count = 0
self._closed = False
if isinstance(self._device, int):
self._devname = sd.query_devices(self._device)['name']
else:
self._devname = tuple(sd.query_devices(dev)['name'] for dev in self._device)
if kind == 'duplex':
self.framesize = self.samplesize[0]*self.channels[0], self.samplesize[1]*self.channels[1]
else:
self.framesize = self.samplesize*self.channels
if kind == 'duplex' or kind == 'output':
if self.blocksize and qsize > 0:
qsize = (qsize+self.blocksize-1)//self.blocksize
self.txq = queue.Queue(qsize)
else:
self.txq = None
if kind == 'duplex' or kind == 'input':
self.rxq = queue.Queue(-1)
else:
self.rxq = None
def icallback(self, in_data, frame_count, time_info, status):
self.status |= status
if status._flags&0xF:
self._set_exception(sd.PortAudioError(str(status)))
raise sd.CallbackAbort
try:
self.rxq.put_nowait(bytearray(in_data))
except queue.Full:
self._set_exception(queue.Full("Receive queue is full."))
raise sd.CallbackAbort
self.frame_count += frame_count
def ocallback(self, out_data, frame_count, time_info, status):
self.status |= status
if status._flags&0xF:
self._set_exception(sd.PortAudioError(str(status)))
raise sd.CallbackAbort
try:
txbuff = self.txq.get_nowait()
except queue.Empty:
self._set_exception(queue.Empty("Transmit queue is empty."))
raise sd.CallbackAbort
out_data[:len(txbuff)] = txbuff
# This is our last callback!
if len(txbuff) < frame_count*self.framesize:
self.frame_count += len(txbuff)//self.framesize
raise sd.CallbackStop
self.frame_count += frame_count
def iocallback(self, in_data, out_data, frame_count, time_info, status):
self.status |= status
if status._flags&0xF:
self._set_exception(sd.PortAudioError(str(status)))
raise sd.CallbackAbort
try:
txbuff = self.txq.get_nowait()
except queue.Empty:
self._set_exception(queue.Empty("Transmit queue is empty."))
raise sd.CallbackAbort
out_data[:len(txbuff)] = txbuff
try:
self.rxq.put_nowait(bytearray(in_data))
except queue.Full:
self._set_exception(queue.Full("Receive queue is full."))
raise sd.CallbackAbort
# This is our last callback!
if len(txbuff) < frame_count*self.framesize[1]:
self.frame_count += len(txbuff)//self.framesize[1]
self.rxq.put(None) # This actually gets done in closequeues but that method doesn't work in windows
raise sd.CallbackStop
self.frame_count += frame_count
def _closequeues(self):
if not self._closed:
self._closed = True
else:
return
if self.rxq is not None:
self.rxq.queue.clear()
self.rxq.put(None)
if self.txq is not None:
with self.txq.mutex:
self._exit.set()
self.txq.queue.clear()
self.txq.not_full.notify()
def abort(self):
super(QueuedStreamBase, self).abort()
self._closequeues()
def stop(self):
super(QueuedStreamBase, self).stop()
self._closequeues()
def close(self):
super(QueuedStreamBase, self).close()
self._closequeues()
def __repr__(self):
return ("{0}({1._devname!r}, samplerate={1._samplerate:.0f}, "
"channels={1._channels}, dtype={1._dtype!r}, blocksize={1._blocksize})").format(self.__class__.__name__, self)
class ThreadedStreamBase(QueuedStreamBase):
"""
This class builds on the QueuedStream class by adding the ability to
register functions for reading (qreader) and writing (qwriter) audio data
which run in their own threads. However, the qreader and qwriter threads are
optional; this allows the use of a 'duplex' stream which e.g. has a
dedicated thread for writing data but for which data is read in the main
thread.
An important advantage with this class is that it properly handles any
exceptions raised in the qreader and qwriter threads. Specifcally, if an
exception is raised, the stream will be aborted and the exception re-raised
in the main thread.
"""
def __init__(self, blocksize=None, qreader=None, qwriter=None, kind='duplex', **kwargs):
super(ThreadedStreamBase, self).__init__(kind=kind, blocksize=blocksize, **kwargs)
self._exit = threading.Event()
self._exc = queue.Queue()
if (kind == 'duplex' or kind == 'output') and qwriter is not None:
txt = threading.Thread(target=self._qrwwrapper, args=(self.txq, qwriter))
txt.daemon = True
self.txt = txt
else:
self.txt = None
if (kind == 'duplex' or kind == 'input') and qreader is not None:
rxt = threading.Thread(target=self._qrwwrapper, args=(self.rxq, qreader))
rxt.daemon = True
self.rxt = rxt
else:
self.rxt = None
def _raise_exceptions(self):
if self._exc.empty():
return
exc = self._exc.get()
if isinstance(exc, tuple):
exctype, excval, exctb = exc
try:
raise exctype(excval).with_traceback(exctb)
except AttributeError:
traceback.print_tb(exctb)
raise exctype(excval)
raise exc
def _set_exception(self, exc=None):
# ignore subsequent exceptions
if not self._exc.empty():
return
if exc is None:
exc = sys.exc_info()
self._exc.put(exc)
def _qrwwrapper(self, queue, qrwfunc):
"""
Wrapper function for the qreader and qwriter threads which acts as a
kind of context manager.
"""
try:
qrwfunc(self, queue)
except:
# Raise the exception in the main thread
self._set_exception()
# suppress the exception in this child thread
try: self.abort()
except: pass
def _stopiothreads(self):
currthread = threading.current_thread()
if self.rxt is not None and self.rxt.is_alive() and self.rxt != currthread:
self.rxt.join()
if self.txt is not None and self.txt.is_alive() and self.txt != currthread:
self.txt.join()
def start(self):
if self.txt is not None:
# Prime the buffer
self.txt.start()
while not self.txq.full() and self.txt.is_alive():
time.sleep(0.001)
if self.rxt is not None:
self.rxt.start()
super(ThreadedStreamBase, self).start()
def abort(self):
super(ThreadedStreamBase, self).abort()
self._stopiothreads()
def stop(self):
super(ThreadedStreamBase, self).stop()
self._stopiothreads()
def close(self):
super(ThreadedStreamBase, self).close()
self._stopiothreads()
self._raise_exceptions()
def _soundfilewriter(stream, rxq):
if isinstance(stream.dtype, basestring):
dtype = stream.dtype
else:
dtype = stream.dtype[0]
while True:
try:
item = rxq.get(timeout=1)
except queue.Empty:
raise queue.Empty("Timed out waiting for data.")
if item is None: break
stream.out_fh.buffer_write(item, dtype=dtype)
def _soundfilereader(stream, txq):
try:
framesize = stream.framesize[1]
dtype = stream.dtype[1]
except TypeError:
framesize = stream.framesize
dtype = stream.dtype
buff = bytearray(stream.fileblocksize*framesize)
while not stream._exit.is_set():
nframes = stream.inp_fh.buffer_read_into(buff, dtype=dtype)
txq.put(bytearray(buff[:nframes*framesize]))
if nframes < stream.fileblocksize:
break
class SoundFileStreamBase(ThreadedStreamBase):
"""
This helper class basically gives you two things:
1) it provides complete qreader and qwriter functions for SoundFile
objects (or anything that can be opened as a SoundFile object)
2) it automatically sets parameters for the stream based on the input
file and automatically sets parameters for the output file based on
the output stream.
"""
def __init__(self, inpf=None, outf=None, fileblocksize=None, qreader=None, qwriter=None, kind='duplex', sfkwargs={}, **kwargs):
# We're playing an audio file, so we can safely assume there's no need
# to clip
if kwargs.get('clip_off', None) is None:
kwargs['clip_off'] = True
if kwargs.get('blocksize', None) is None:
kwargs['blocksize'] = fileblocksize
# At this point we don't care what 'kind' the stream is, only whether
# the input/output is None which determines whether qreader/qwriter
# functions should be registered
self._inpf = inpf
if inpf is not None:
inp_fh = inpf
if not isinstance(inpf, sf.SoundFile):
inp_fh = sf.SoundFile(inpf)
if kwargs.get('samplerate', None) is None:
kwargs['samplerate'] = inp_fh.samplerate
if kwargs.get('channels', None) is None:
kwargs['channels'] = inp_fh.channels
if qwriter is None:
qwriter = _soundfilereader
else:
inp_fh = inpf
# We need to set the qreader here; output file parameters will known
# once we open the stream
self._outf = outf
if outf is not None and qreader is None:
qreader = _soundfilewriter
super(SoundFileStreamBase, self).__init__(qreader=qreader,
qwriter=qwriter,
kind=kind, **kwargs)
# Try and determine the file extension here; we need to know it if we
# want to try and set a default subtype for the output
try:
outext = getattr(outf, 'name', outf).rsplit('.', 1)[1].lower()
except AttributeError:
outext = None
if not isinstance(outf, sf.SoundFile) and outf is not None:
if isinstance(inp_fh, sf.SoundFile):
if sfkwargs.get('endian', None) is None:
sfkwargs['endian'] = inp_fh.endian
if (sfkwargs.get('format', outext) == inp_fh.format.lower()
and sfkwargs.get('subtype', None) is None):
sfkwargs['subtype'] = inp_fh.subtype
if sfkwargs.get('channels', None) is None:
sfkwargs['channels'] = self.channels[0] if kind == 'duplex' else self.channels
if sfkwargs.get('samplerate', None) is None:
sfkwargs['samplerate'] = int(self.samplerate)
if sfkwargs.get('mode', None) is None:
sfkwargs['mode'] = 'w+b'
out_fh = sf.SoundFile(outf, **sfkwargs)
else:
out_fh = outf
self.inp_fh = inp_fh
self.out_fh = out_fh
self.fileblocksize = fileblocksize or self.blocksize
def close(self):
try:
super(SoundFileStreamBase, self).close()
finally:
if self._outf != self.out_fh:
self.out_fh.close()
if self._inpf != self.inp_fh:
self.inp_fh.close()
class SoundFileInputStream(SoundFileStreamBase):
def __init__(self, outf, sfkwargs={}, **kwargs):
super(SoundFileInputStream, self).__init__(outf=outf, kind='input', sfkwargs=sfkwargs, **kwargs)
class SoundFileOutputStream(SoundFileStreamBase):
def __init__(self, inpf, qsize=TXQSIZE, fileblocksize=None, **kwargs):
super(SoundFileOutputStream, self).__init__(inpf=inpf, qsize=qsize,
kind='output',
fileblocksize=fileblocksize,
**kwargs)
class SoundFileStream(SoundFileStreamBase):
"""
Note that only one of inpf and outf is required. This allows you to e.g. use
a SoundFile as input but implement your own qreader and/or read from the
queue in the main thread.
"""
def __init__(self, inpf=None, outf=None, qsize=TXQSIZE, sfkwargs={}, fileblocksize=None, **kwargs):
# If you're not using soundfiles for the input or the output, then you
# should probably be using the Queued or ThreadedStream class
if inpf is None and outf is None:
raise ValueError("No input or output file given.")
super(SoundFileStream, self).__init__(inpf=inpf, outf=outf, qsize=qsize,
fileblocksize=fileblocksize,
kind='duplex', **kwargs)
def blockstream(inpf=None, blocksize=1024, overlap=0, always_2d=False, copy=False, **kwargs):
import numpy as np
assert blocksize is not None and blocksize > 0, "Requires a fixed known blocksize"
incframes = blocksize-overlap
if inpf is None:
stream = QueuedStreamBase(kind='input', blocksize=incframes, **kwargs)
dtype = stream.dtype
channels = stream.channels
else:
stream = SoundFileStream(inpf=inpf, blocksize=incframes, **kwargs)
dtype = stream.dtype[0]
channels = stream.channels[0]
rxqueue = stream.rxq
if channels > 1:
always_2d = True
outbuff = np.zeros((blocksize, channels) if always_2d else blocksize*channels, dtype=dtype)
with stream:
while stream.active:
try:
item = rxqueue.get(timeout=1)
except queue.Empty:
raise queue.Empty("Timed out waiting for data.")
if item is None: break
rxbuff = np.frombuffer(item, dtype=dtype)
if always_2d:
rxbuff.shape = (len(rxbuff)//channels, channels)
outbuff[overlap:overlap+len(rxbuff)] = rxbuff
yield outbuff[:] if copy else outbuff
outbuff[:-incframes] = outbuff[incframes:]
# Used just for the pastream app
def SoundFileStreamFactory(inpf=None, outf=None, **kwargs):
if inpf is not None and outf is not None:
Streamer = SoundFileStream
ioargs = (inpf, outf)
elif inpf is not None:
Streamer = SoundFileOutputStream
ioargs = (inpf,)
kwargs.pop('sfkwargs', None)
kwargs.pop('qreader', None)
elif outf is not None:
Streamer = SoundFileInputStream
ioargs = (outf,)
kwargs.pop('qsize', None)
kwargs.pop('fileblocksize', None)
kwargs.pop('qwriter', None)
else:
raise SystemExit("No input or output selected.")
return Streamer(*ioargs, **kwargs)
def get_parser(parser=None):
from argparse import Action, ArgumentParser, RawDescriptionHelpFormatter
if parser is None:
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
fromfile_prefix_chars='@',
description=__doc__)
parser.convert_arg_line_to_args = lambda arg_line: arg_line.split()
class ListStreamsAction(Action):
def __call__(*args, **kwargs):
print(sd.query_devices())
sys.exit(0)
def devtype(dev):
try: return int(dev)
except ValueError: return dev
def posint(intarg):
intarg = int(intarg)
assert intarg > 0, "Queue size must be a positive value."
return intarg
parser.add_argument("input", type=lambda x: None if x=='-' else x,
help='''\
Input audio file, or, use the special designator '-' for recording only.''')
parser.add_argument("output", type=lambda x: None if x=='-' else x,
help='''\
Output recording file, or, use the special designator '-' for playback only.''')
parser.add_argument("-l", action=ListStreamsAction, nargs=0,
help="List available audio device streams.")
parser.add_argument("-q", "--qsize", type=posint, default=TXQSIZE,
help="File transmit buffer size (in units of frames). Increase for smaller blocksizes.")
devopts = parser.add_argument_group("I/O device stream options")
devopts.add_argument("-d", "--device", type=devtype,
help='''\
Audio device name expression or index number. Defaults to the PortAudio default device.''')
devopts.add_argument("-b", "--blocksize", type=int, default=2048,
help="PortAudio buffer size and file block size (in units of frames).")
devopts.add_argument("-B", "--file-blocksize", type=int,
help='''\
Only used for special cases where you want to set the file block size
differently than the portaudio buffer size. You most likely don't need to use
this option. (In units of frames).''')
devopts.add_argument("-f", "--format", dest='dtype',
choices=sd._sampleformats.keys(),
help='''Sample format of device I/O stream.''')
devopts.add_argument("-r", "--rate", dest='samplerate',
type=lambda x: int(float(x[:-1])*1000) if x.endswith('k') else int(x),
help="Sample rate in Hz. Add a 'k' suffix to specify kHz.")
fileopts = parser.add_argument_group('''\
Output file format options''')
fileopts.add_argument("-t", dest="file_type", metavar='FILE_TYPE',
choices=map(str.lower, sf.available_formats().keys()),
help='''\
Output file type. Typically this is determined from the file extension, but it
can be manually overridden here.''')
fileopts.add_argument("--endian", choices=['file', 'big', 'little'],
help="Byte endianness.")
fileopts.add_argument("-e", "--encoding",
choices=map(str.lower, sf.available_subtypes()),
help="Sample format encoding.")
return parser
def main(argv=None):
if argv is None: argv=sys.argv[1:]
parser = get_parser()
args = parser.parse_args(argv)
sfkwargs=dict(endian=args.endian, subtype=args.encoding, format=args.file_type)
stream = SoundFileStreamFactory(args.input, args.output, qsize=args.qsize,
sfkwargs=sfkwargs, device=args.device,
dtype=args.dtype,
samplerate=args.samplerate,
blocksize=args.blocksize,
fileblocksize=args.file_blocksize)
statline = "\r{:8.3f}s {:10d} frames processed, {:>10s} frames queued, {:>10s} frames received\r"
print("<-", stream.inp_fh if stream.inp_fh is not None else 'null')
print("--", stream)
print("->", stream.out_fh if stream.out_fh is not None else 'null')
nullinp = nullout = None
if stream.inp_fh is None:
nullinp = 'n/a'
if stream.out_fh is None or not stream.out_fh.seekable:
nullout = 'n/a'
try:
with stream:
t1 = time.time()
while stream.active:
time.sleep(0.1)
line = statline.format(time.time()-t1, stream.frame_count,
nullinp or str(stream.txq.qsize()*stream.fileblocksize),
nullout or str(stream.out_fh.tell()))
sys.stdout.write(line)
sys.stdout.flush()
except KeyboardInterrupt:
pass
finally:
print()
return 0
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python
"""
Uses the `soundfile <http://pysoundfile.readthedocs.io>`_ and `sounddevice
<http://python-sounddevice.readthedocs.io>`_ libraries to playback, record, or
simultaneously playback and record audio files.
Notes::
+ 24-bit streaming is currently not supported (typically 32-bit streaming gets
downconverted automatically anyway)
+ For simplicity, this app only supports 'symmetric' full duplex audio streams;
i.e., the input device and output device are assumed to be the same.
"""
from __future__ import print_function
try:
import Queue as queue
except ImportError:
import queue
try:
basestring
except NameError:
basestring = (str, bytes)
import threading
import time
import sys
import sounddevice as sd
import soundfile as sf
import traceback
# import rtmixer
TXQSIZE = 1<<16 # Number of frames to buffer for transmission
_dtype2ctype = {'int32': 'int', 'int24': 'int', 'int16': 'short', 'float32': 'float'}
# def _get(ringbuff, timeout=1):
# t1 = time.time()
# while time.time()-t1 < timeout:
# nframes = ringbuff.read_available
# if nframes < stream.fileblocksize:
# time.sleep(0.02)
# else:
# nframes, buff1, buff2 = ringbuff.get_read_buffers(nframes)
# nframes = stream.out_fh.buffer_write(buff1, ctype)
# raise queue.Empty("Timed out waiting for data.")
# def _rbreader(stream, ringbuff):
# if isinstance(stream.dtype, basestring):
# dtype = stream.dtype
# else:
# dtype = stream.dtype[0]
# ctype = _dtype2ctype[dtype]
# while True:
# try:
# item = ringbuff.get(timeout=1)
# except queue.Empty:
# raise queue.Empty("Timed out waiting for data.")
# if item is None: break
# nframes = stream.out_fh.buffer_write(buff1, ctype)
# ringbuff.advance_read_index(nframes)
# def _rbwriter(stream, ringbuff):
# try:
# samplesize = stream.samplesize[1]
# dtype = stream.dtype[1]
# except TypeError:
# samplesize = stream.samplesize
# dtype = stream.dtype
# ctype = _dtype2ctype[dtype]
# while not stream._exit.is_set():
# nframes, buff1, buff2 = ringbuff.get_write_buffers(stream.fileblocksize)
# nframes = stream.inp_fh.buffer_read_into(buff1, ctype)
# if nframes < stream.fileblocksize:
# if stream.loop == 0:
# stream._exit.set()
# else:
# stream.loop -= 1
# stream.inp_fh.seek(0)
# nframes += stream.inp_fh.buffer_read_into(buff[nframes*stream.framesize:], ctype)
# ringbuff.advance_write_index(nframes)
class QueuedStreamBase(sd._StreamBase):
def __init__(self, blocksize=None, qsize=-1, kind=None, **kwargs):
if kwargs.get('callback', None) is None:
if kind == 'input':
kwargs['callback'] = self.icallback
elif kind == 'output':
kwargs['callback'] = self.ocallback
else:
kwargs['callback'] = self.iocallback
if (kind == 'output' or kind is None) and not blocksize:
raise ValueError("Non-zero blocksize is required for playback mode.")
kwargs['wrap_callback'] = 'buffer'
super(QueuedStreamBase, self).__init__(kind=kind, blocksize=blocksize, **kwargs)
self.status = sd.CallbackFlags()
self.frame_count = 0
self._closed = False
if isinstance(self._device, int):
self._devname = sd.query_devices(self._device)['name']
else:
self._devname = tuple(sd.query_devices(dev)['name'] for dev in self._device)
if kind is None:
self.framesize = self.samplesize[0]*self.channels[0], self.samplesize[1]*self.channels[1]
else:
self.framesize = self.samplesize*self.channels
if kind is None or kind == 'output':
if self.blocksize and qsize > 0:
qsize = (qsize+self.blocksize-1)//self.blocksize
self.txq = queue.Queue(qsize)
else:
self.txq = None
if kind is None or kind == 'input':
self.rxq = queue.Queue(-1)
else:
self.rxq = None
def icallback(self, in_data, frame_count, time_info, status):
self.status |= status
if status._flags&0xF:
self._set_exception(sd.PortAudioError(str(status)))
raise sd.CallbackAbort
if not status.priming_output:
try:
self.rxq.put_nowait(bytearray(in_data))
except queue.Full:
self._set_exception(queue.Full("Receive queue is full."))
raise sd.CallbackAbort
self.frame_count += frame_count
def ocallback(self, out_data, frame_count, time_info, status):
self.status |= status
if status._flags&0xF:
self._set_exception(sd.PortAudioError(str(status)))
raise sd.CallbackAbort
try:
txbuff = self.txq.get_nowait()
except queue.Empty:
self._set_exception(queue.Empty("Transmit queue is empty."))
raise sd.CallbackAbort
out_data[:len(txbuff)] = txbuff
# This is our last callback!
if len(txbuff) < frame_count*self.framesize:
self.frame_count += len(txbuff)//self.framesize
raise sd.CallbackStop
self.frame_count += frame_count
def iocallback(self, in_data, out_data, frame_count, time_info, status):
self.status |= status
if status._flags&0xF:
self._set_exception(sd.PortAudioError(str(status)))
raise sd.CallbackAbort
try:
txbuff = self.txq.get_nowait()
except queue.Empty:
self._set_exception(queue.Empty("Transmit queue is empty."))
raise sd.CallbackAbort
out_data[:len(txbuff)] = txbuff
try:
self.rxq.put_nowait(bytearray(in_data))
except queue.Full:
self._set_exception(queue.Full("Receive queue is full."))
raise sd.CallbackAbort
# This is our last callback!
if len(txbuff) < frame_count*self.framesize[1]:
self.frame_count += len(txbuff)//self.framesize[1]
raise sd.CallbackStop
self.frame_count += frame_count
def _closequeues(self):
if not self._closed:
self._closed = True
else:
return
if self.rxq is not None:
self.rxq.queue.clear()
self.rxq.put(None)
if self.txq is not None:
with self.txq.mutex:
self._exit.set()
self.txq.queue.clear()
self.txq.not_full.notify()
def abort(self):
super(QueuedStreamBase, self).abort()
self._closequeues()
def stop(self):
super(QueuedStreamBase, self).stop()
self._closequeues()
def close(self):
super(QueuedStreamBase, self).close()
self._closequeues()
def __repr__(self):
return ("{0}({1._devname!r}, samplerate={1._samplerate:.0f}, "
"channels={1._channels}, dtype={1._dtype!r}, blocksize={1._blocksize})").format(self.__class__.__name__, self)
class QueuedInputStream(QueuedStreamBase):
def __init__(self, blocksize=None, **kwargs):
super(QueuedInputStream, self).__init__(kind='input', blocksize=blocksize, **kwargs)
def __iter__(self):
return self.__enter__()
def next(self):
try:
item = self.rxq.get(timeout=1)
except queue.Empty:
raise queue.Empty("Timed out waiting for data.")
if item is None:
self.__exit__()
raise StopIteration
return item
__next__ = next
class ThreadedStreamBase(QueuedStreamBase):
def __init__(self, blocksize=None, qreader=None, qwriter=None, kind=None, **kwargs):
super(ThreadedStreamBase, self).__init__(kind=kind, blocksize=blocksize, **kwargs)
self._exit = threading.Event()
self._exc = queue.Queue()
if (kind is None or kind == 'output') and qwriter is not None:
txt = threading.Thread(target=self._qrwwrapper, args=(self.txq, qwriter))
txt.daemon = True
self.txt = txt
else:
self.txt = None
if (kind is None or kind == 'input') and qreader is not None:
rxt = threading.Thread(target=self._qrwwrapper, args=(self.rxq, qreader))
rxt.daemon = True
self.rxt = rxt
else:
self.rxt = None
def _raise_exceptions(self):
if self._exc.empty():
return
exc = self._exc.get()
if isinstance(exc, tuple):
exctype, excval, exctb = exc
# raise exctype, excval, exctb
raise exctype(excval).with_traceback(exctb)
raise exc
def _set_exception(self, exc=None):
# ignore subsequent exceptions
if not self._exc.empty():
return
if exc is None:
exc = sys.exc_info()
self._exc.put(exc)
def _qrwwrapper(self, queue, qrwfunc):
try:
qrwfunc(self, queue)
except:
# Raise the exception in the main thread
self._set_exception()
# suppress the exception in this child thread
try: self.abort()
except: pass
def _stopiothreads(self):
currthread = threading.current_thread()
if self.rxt is not None and self.rxt.is_alive() and self.rxt != currthread:
self.rxt.join()
if self.txt is not None and self.txt.is_alive() and self.txt != currthread:
self.txt.join()
def start(self):
if self.txt is not None:
# Prime the buffer
self.txt.start()
while not self.txq.full() and self.txt.is_alive():
time.sleep(0.001)
if self.rxt is not None:
self.rxt.start()
super(ThreadedStreamBase, self).start()
def abort(self):
super(ThreadedStreamBase, self).abort()
self._stopiothreads()
def stop(self):
super(ThreadedStreamBase, self).stop()
self._stopiothreads()
def close(self):
super(ThreadedStreamBase, self).close()
self._stopiothreads()
self._raise_exceptions()
def _soundfilewriter(stream, rxq):
if isinstance(stream.dtype, basestring):
dtype = stream.dtype
else:
dtype = stream.dtype[0]
while True:
try:
item = rxq.get(timeout=1)
except queue.Empty:
raise queue.Empty("Timed out waiting for data.")
if item is None: break
stream.out_fh.buffer_write(item, dtype=dtype)
def _soundfilereader(stream, txq):
try:
framesize = stream.framesize[1]
dtype = stream.dtype[1]
except TypeError:
framesize = stream.framesize
dtype = stream.dtype
buff = bytearray(stream.fileblocksize*framesize)
while not stream._exit.is_set():
nframes = stream.inp_fh.buffer_read_into(buff, dtype=dtype)
if nframes < stream.blocksize:
if stream.loop == 0:
stream._exit.set()
else:
stream.loop -= 1
stream.inp_fh.seek(0)
nframes += stream.inp_fh.buffer_read_into(buff[nframes*framesize:], dtype=dtype)
txq.put(bytearray(buff[:nframes*framesize]))
class SoundFileStreamBase(ThreadedStreamBase):
def __init__(self, inpf=None, outf=None, fileblocksize=None, qreader=None, qwriter=None, kind=None, **kwargs):
# We're playing an audio file, so we can safely assume there's no need
# to clip
if kwargs.get('clip_off', None) is None:
kwargs['clip_off'] = True
if kwargs.get('blocksize', None) is None:
kwargs['blocksize'] = fileblocksize
self._inpf = inpf
if inpf is not None and not isinstance(inpf, sf.SoundFile):
inp_fh = sf.SoundFile(inpf)
else:
inp_fh = inpf
if isinstance(inp_fh, sf.SoundFile):
if kwargs.get('samplerate', None) is None:
kwargs['samplerate'] = inp_fh.samplerate
if kwargs.get('channels', None) is None:
kwargs['channels'] = inp_fh.channels
if qwriter is None:
qwriter = _soundfilereader
# elif inpf is not None and qwriter is None:
# raise ValueError("If not using a SoundFile object for playback you must provide your own qwriter routine.")
self._outf = outf
try:
outext = getattr(outf, 'name', outf).rsplit('.', 1)[1].lower()
except AttributeError:
outext = None
if not isinstance(outf, sf.SoundFile) and outf is not None:
sfkwargs = kwargs['sfkwargs']
if isinstance(inp_fh, sf.SoundFile):
if sfkwargs.get('endian', None) is None:
sfkwargs['endian'] = inp_fh.endian
if (sfkwargs.get('format', outext) == inp_fh.format.lower()
and sfkwargs.get('subtype', None) is None):
sfkwargs['subtype'] = inp_fh.subtype
if sfkwargs.get('channels', None) is None:
sfkwargs['channels'] = self.channels[0]
if sfkwargs.get('samplerate', None) is None:
sfkwargs['samplerate'] = int(self.samplerate)
if sfkwargs.get('mode', None) is None:
sfkwargs['mode'] = 'w+b'
out_fh = sf.SoundFile(outf, **sfkwargs)
if qreader is None:
qreader = _soundfilewriter
else:
out_fh = outf
# elif outf is not None and qreader is None:
# raise ValueError("If not using a SoundFile object for recording you must provide your own qreader routine.")
super(SoundFileStreamBase, self).__init__(qreader=qreader,
qwriter=qwriter,
kind=kind, **kwargs)
self.inp_fh = inp_fh
self.out_fh = outf
self.fileblocksize = fileblocksize or self.blocksize
def close(self):
try:
super(SoundFileStreamBase, self).close()
finally:
if self._outf != self.out_fh:
self.out_fh.close()
if self._inpf != self.inp_fh:
self.inp_fh.close()
class SoundFileInputStream(SoundFileStreamBase):
def __init__(self, outf, sfkwargs={}, **kwargs):
super(SoundFileInputStream, self).__init__(outf=outf, kind='input', **kwargs)
class SoundFileOutputStream(SoundFileStreamBase):
def __init__(self, inpf, loop=False, qsize=TXQSIZE, fileblocksize=None, **kwargs):
loop = -1 if loop is True else int(loop)
if loop < 0 and qsize <= 0:
raise ValueError("Must choose a positive finite qsize for infinite loop mode.")
super(SoundFileOutputStream, self).__init__(inpf=inpf, qsize=qsize,
kind='output',
fileblocksize=fileblocksize,
**kwargs)
if loop!=0 and not self.inp_fh.seekable:
raise ValueError("Loop mode specified but input file is not seekable.")
self.loop = loop
class SoundFileStream(SoundFileStreamBase):
def __init__(self, inpf=None, outf=None, loop=False, qsize=TXQSIZE, sfkwargs={}, fileblocksize=None, **kwargs):
if inpf is None and outf is None:
raise ValueError("No input or output file given.")
loop = -1 if loop is True else int(loop)
if loop < 0 and qsize <= 0:
raise ValueError("Must choose a positive finite qsize for infinite loop mode.")
super(SoundFileStream, self).__init__(inpf=inpf, outf=outf, qsize=qsize,
fileblocksize=fileblocksize,
kind=None, **kwargs)
if loop!=0 and not self.inp_fh.seekable:
raise ValueError("Loop mode specified but input file is not seekable.")
self.loop = loop
def blockstream(inpf=None, blocksize=1024, overlap=0, always_2d=False, copy=False, **kwargs):
import numpy as np
assert blocksize is not None and blocksize > 0
incframes = blocksize-overlap
if inpf is None:
stream = QueuedInputStream(blocksize=incframes, **kwargs)
dtype = stream.dtype
channels = stream.channels
else:
stream = SoundFileStream(inpf=inpf, blocksize=incframes, **kwargs)
dtype = stream.dtype[0]
channels = stream.channels[0]
rxqueue = stream.rxq
if channels > 1:
always_2d = True
outbuff = np.zeros((blocksize, channels) if always_2d else blocksize, dtype=dtype)
with stream:
while True:
try:
item = rxqueue.get(timeout=1)
except queue.Empty:
raise queue.Empty("Timed out waiting for data.")
if item is None: break
rxbuff = np.frombuffer(item, dtype=dtype)
outbuff[overlap:] = rxbuff
yield outbuff[:] if copy else outbuff
outbuff[:-incframes] = outbuff[incframes:]
def SoundFileStreamFactory(inpf=None, outf=None, **kwargs):
if inpf is not None and outf is not None:
Streamer = SoundFileStream
ioargs = (inpf, outf)
elif inpf is not None:
Streamer = SoundFileOutputStream
ioargs = (inpf,)
kwargs.pop('sfkwargs', None)
kwargs.pop('qreader', None)
elif outf is not None:
Streamer = SoundFileInputStream
ioargs = (outf,)
kwargs.pop('loop', None)
kwargs.pop('qsize', None)
kwargs.pop('fileblocksize', None)
kwargs.pop('qwriter', None)
else:
raise SystemExit("No input or output selected.")
return Streamer(*ioargs, **kwargs)
def get_parser(parser=None):
from argparse import Action, ArgumentParser, RawDescriptionHelpFormatter
if parser is None:
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
fromfile_prefix_chars='@',
description=__doc__)
parser.convert_arg_line_to_args = lambda arg_line: arg_line.split()
class ListStreamsAction(Action):
def __call__(*args, **kwargs):
print(sd.query_devices())
sys.exit(0)
def devtype(dev):
try: return int(dev)
except ValueError: return dev
def posint(intarg):
intarg = int(intarg)
assert intarg > 0, "Queue size must be a positive value."
return intarg
parser.add_argument("input", type=lambda x: None if x=='-' else x,
help='''\
Input audio file, or, use the special designator '-' for recording only.''')
parser.add_argument("output", type=lambda x: None if x=='-' else x,
help='''\
Output recording file, or, use the special designator '-' for playback only.''')
parser.add_argument("--loop", default=False, nargs='?', metavar='n', const=True, type=int,
help='''\
Replay the playback file n times. If no argument is specified, playback will
loop infinitely. Does nothing if there is no playback.''')
parser.add_argument("-l", action=ListStreamsAction, nargs=0,
help="List available audio device streams.")
parser.add_argument("-q", "--qsize", type=posint, default=TXQSIZE,
help="File transmit buffer size (in units of frames). Increase for smaller blocksizes.")
devopts = parser.add_argument_group("I/O device stream options")
devopts.add_argument("-d", "--device", type=devtype,
help='''\
Audio device name expression or index number. Defaults to the PortAudio default device.''')
devopts.add_argument("-b", "--blocksize", type=int, default=2048,
help="PortAudio buffer size and file block size (in units of frames).")
devopts.add_argument("-B", "--file-blocksize", type=int,
help='''\
Only used for special cases where you want to set the file block size
differently than the portaudio buffer size. You most likely don't need to use
this option. (In units of frames).''')
devopts.add_argument("-f", "--format", dest='dtype',
choices=sd._sampleformats.keys(),
help='''Sample format of device I/O stream.''')
devopts.add_argument("-r", "--rate", dest='samplerate',
type=lambda x: int(float(x[:-1])*1000) if x.endswith('k') else int(x),
help="Sample rate in Hz. Add a 'k' suffix to specify kHz.")
fileopts = parser.add_argument_group('''\
Output file format options''')
fileopts.add_argument("-t", dest="file_type", metavar='FILE_TYPE',
choices=map(str.lower, sf.available_formats().keys()),
help='''\
Output file type. Typically this is determined from the file extension, but it
can be manually overridden here.''')
# fileopts.add_argument("-c", "--channels", type=int,
# help='''Number of output channels.''')
fileopts.add_argument("--endian", choices=['file', 'big', 'little'],
help="Byte endianness.")
fileopts.add_argument("-s", "--subtype",
choices=map(str.lower, sf.available_subtypes()),
help="Sample format encoding.")
return parser
def main(argv=None):
if argv is None: argv=sys.argv[1:]
parser = get_parser()
args = parser.parse_args(argv)
sfkwargs=dict(endian=args.endian, subtype=args.subtype, format=args.file_type)
stream = SoundFileStreamFactory(args.input, args.output, loop=args.loop,
qsize=args.qsize, sfkwargs=sfkwargs,
device=args.device, dtype=args.dtype,
samplerate=args.samplerate,
blocksize=args.blocksize,
fileblocksize=args.file_blocksize)
if isinstance(stream, SoundFileInputStream):
mode = 'r'
elif isinstance(stream, SoundFileOutputStream):
mode = 'w'
else:
mode = 'rw'
attrs=dict(vars(stream))
if mode == 'rw':
attrs['_channels'] = attrs['_channels'][0]
attrs['_dtype'] = attrs['_dtype'][0]
statline = "\r{:8.3f}s {:10d} frames processed, {:>10s} frames queued, {:>10s} frames received\r"
print("<-", stream.inp_fh if 'w' in mode else 'null')
print("--", stream)
print("->", stream.out_fh if 'r' in mode else 'null')
nullinp = nullout = None
if not 'w' in mode:
nullinp = 'n/a'
if not ('r' in mode and stream.out_fh.seekable()):
nullout = 'n/a'
try:
with stream:
t1 = time.time()
while stream.active:
time.sleep(0.1)
line = statline.format(time.time()-t1, stream.frame_count,
nullinp or str(stream.txq.qsize()*stream.fileblocksize),
nullout or str(stream.out_fh.tell()))
sys.stdout.write(line)
sys.stdout.flush()
except KeyboardInterrupt:
pass
finally:
print()
return 0
if __name__ == '__main__':
sys.exit(main())
"""
Loopback tests for pastream.
"""
import os, sys
import numpy as np
import soundfile as sf
import pytest
import numpy.testing as npt
import time
import tempfile
import pastream
BLOCKSIZE = 2048
TEST_LENGTHS = [5]
vhex = np.vectorize('{:#10x}'.format)
tohex = lambda x: vhex(x.view('u4'))
def qreader_compare(stream, rxq):
dtype = stream.dtype[0]
inpf2 = sf.SoundFile(stream.inp_fh.name.name, mode='rb')
found_delay = False
while True:
try:
item = rxq.get(timeout=1)
except queue.Empty:
raise queue.Empty("Timed out waiting for data.")
if item is None: break
outframes = np.frombuffer(item, dtype=dtype).reshape(-1, stream.channels[1])
if not found_delay:
if outframes.any():
found_delay = True
nonzeros = np.where(outframes[:, 0])[0]
outframes = outframes[nonzeros[0]:]
if found_delay:
inframes = inpf2.read(len(outframes), dtype=dtype, always_2d=True)
# if not allow_truncation:
# assert len(inframes) == len(outframes), "Some samples were dropped"
mlen = min(len(inframes), len(outframes))
inp = inframes[:mlen].view('u4')
out = outframes[:mlen].view('u4')
try:
npt.assert_array_equal(inp, out, "Loopback data mismatch")
except AssertionError:
stream.set_exception()
break
def sf_find_delay(xf, mask=0xFFFF0000, chan=None):
pos = xf.tell()
off = -1
inpblocks = xf.blocks(BLOCKSIZE, dtype='int32')
for i,inpblk in enumerate(inpblocks):
nonzeros = np.where(inpblk[:, chan]&mask)
if nonzeros[0].any():
off = i*BLOCKSIZE + nonzeros[0][0]
break
xf.seek(pos)
return off
def sf_assert_equal(tx_fh, rx_fh, mask=0xFFFF0000, chi=None, chf=None, allow_truncation=False, allow_delay=False):
d = sf_find_delay(rx_fh, mask=mask, chan=chi)
assert d != -1, "Test Preamble pattern not found"
if not allow_delay:
assert d == 0, "Signal delayed by %d frames" % d
rx_fh.seek(d)
inpblocks = tx_fh.blocks(BLOCKSIZE, dtype='int32')
for inpblk in inpblocks:
outblk = rx_fh.read(BLOCKSIZE, dtype='int32')
if not allow_truncation:
assert len(inpblk) == len(outblk), "Some samples were dropped"
mlen = min(len(inpblk), len(outblk))
inp = (inpblk[:mlen,chi:chf].view('u4'))&mask
out = outblk[:mlen,chi:chf].view('u4')&mask
npt.assert_array_equal(inp, out, "Loopback data mismatch")
class PortAudioLoopbackTester(object):
def _gen_random(self, rdm_fh, nrepeats, nbytes):
shift = 8*(4-nbytes)
minval = -(0x80000000>>shift)
maxval = 0x7FFFFFFF>>shift
preamble = np.zeros((rdm_fh.samplerate//10, rdm_fh.channels), dtype=np.int32)
preamble[:] = maxval << shift
rdm_fh.write(preamble)
for i in range(nrepeats):
pattern = np.random.randint(minval, maxval+1, (rdm_fh.samplerate, rdm_fh.channels), dtype=np.int32) << shift
rdm_fh.write(pattern.astype(np.int32))
@pytest.fixture(scope='session', params=TEST_LENGTHS)
def randomwav84832(self, request, tmpdir_factory):
tmpdir = tmpdir_factory.getbasetemp()
rdmf = tempfile.NamedTemporaryFile('w+b', dir=str(tmpdir))
rdm_fh = sf.SoundFile(rdmf, 'w+', 48000, 8, 'PCM_32', format='wav')
self._gen_random(rdm_fh, request.param, 4)
rdm_fh.seek(0)
yield rdm_fh
rdm_fh.close()
@pytest.fixture(scope='session', params=TEST_LENGTHS)
def randomwav84824(self, request, tmpdir_factory):
tmpdir = tmpdir_factory.getbasetemp()
rdmf = tempfile.NamedTemporaryFile('w+b', dir=str(tmpdir))
rdm_fh = sf.SoundFile(rdmf, 'w+', 48000, 8, 'PCM_24', format='wav')
self._gen_random(rdm_fh, request.param, 3)
rdm_fh.seek(0)
yield rdm_fh
rdm_fh.close()
@pytest.fixture(scope='session', params=TEST_LENGTHS)
def randomwav84816(self, request, tmpdir_factory):
tmpdir = tmpdir_factory.getbasetemp()
rdmf = tempfile.NamedTemporaryFile('w+b', dir=str(tmpdir))
rdm_fh = sf.SoundFile(rdmf, 'w+', 48000, 8, 'PCM_16', format='wav')
self._gen_random(rdm_fh, request.param, 2)
rdm_fh.seek(0)
yield rdm_fh
rdm_fh.close()
@pytest.fixture(scope='session', params=TEST_LENGTHS)
def randomwav84432(self, request, tmpdir_factory):
tmpdir = tmpdir_factory.getbasetemp()
rdmf = tempfile.NamedTemporaryFile('w+b', dir=str(tmpdir))
rdm_fh = sf.SoundFile(rdmf, 'w+', 44100, 8, 'PCM_32', format='wav')
self._gen_random(rdm_fh, request.param, 4)
rdm_fh.seek(0)
yield rdm_fh
rdm_fh.close()
@pytest.fixture(scope='session', params=TEST_LENGTHS)
def randomwav84416(self, request, tmpdir_factory):
tmpdir = tmpdir_factory.getbasetemp()
rdmf = tempfile.NamedTemporaryFile('w+b', dir=str(tmpdir))
rdm_fh = sf.SoundFile(rdmf, 'w+', 44100, 8, 'PCM_16', format='wav')
self._gen_random(rdm_fh, request.param, 2)
rdm_fh.seek(0)
yield rdm_fh
rdm_fh.close()
def assert_stream_equal(self, inp_fh, qreader=qreader_compare, **kwargs):
devargs = dict(self.devargs)
devargs.update(kwargs)
with pastream.SoundFileStream(inp_fh, qreader=qreader_compare, **devargs) as stream:
while stream.active: time.sleep(0.1)
class TestALSALoopback(PortAudioLoopbackTester):
devargs = dict(fileblocksize=512, device='aduplex')
def test_wav24(self, tmpdir, randomwav84824):
tx_fh = randomwav84824
tx_fh.seek(0)
self.assert_stream_equal(tx_fh, dtype='int32')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment