Skip to content

Instantly share code, notes, and snippets.

@sturlamolden
Last active December 21, 2015 19:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sturlamolden/6353279 to your computer and use it in GitHub Desktop.
Save sturlamolden/6353279 to your computer and use it in GitHub Desktop.
Example of audioprocessing in Windows with Python 2.7 using NumPy and pywin32 (Microsoft DirectSound)
""" Copyright (C) 2013 Sturla Molden """
import threading
import Queue
import pywintypes
import win32event
import win32com.directsound.directsound as directsound
import numpy as np
class BufferDescriptor(object):
""" Buffer descriptor for directsound """
def __init__(self, milliseconds=100):
wfxFormat = pywintypes.WAVEFORMATEX()
wfxFormat.wFormatTag = pywintypes.WAVE_FORMAT_PCM
wfxFormat.nChannels = 2
wfxFormat.nSamplesPerSec = 44100
wfxFormat.nAvgBytesPerSec = 176400
wfxFormat.nBlockAlign = 4
wfxFormat.wBitsPerSample = 16
self.format = wfxFormat
self.size = 4*int((44100*milliseconds)/1000.)
self.milliseconds = milliseconds
self.shape = (self.size//4, 2)
self.dtype = np.int16
self.Fs = 44100
class DirectSoundPlayer(threading.Thread):
""" A thread that writes spike waveforms to the speaker"""
def __init__(self, descriptor, queue, abort):
"""
descriptor: BufferDescriptor
queue: Queue.Queue with playback buffers as np.ndarray
abort: threading.Event as abourt signal
"""
threading.Thread.__init__(self)
d = directsound.DirectSoundCreate(None, None)
d.SetCooperativeLevel(None, directsound.DSSCL_PRIORITY)
sdesc = directsound.DSBUFFERDESC()
sdesc.dwFlags = directsound.DSBCAPS_STICKYFOCUS | directsound.DSBCAPS_CTRLPOSITIONNOTIFY
sdesc.dwBufferBytes = descriptor.size
sdesc.lpwfxFormat = descriptor.format
_buffer = d.CreateSoundBuffer(sdesc, None)
event = win32event.CreateEvent(None, 0, 0, None)
notify = _buffer.QueryInterface(directsound.IID_IDirectSoundNotify)
notify.SetNotificationPositions((directsound.DSBPN_OFFSETSTOP, event))
self.device = d
self.sdesc = sdesc
self.descriptor = descriptor
self.buffer = _buffer
self.event = event
self.notify = notify
self.abort = abort
self.queue = queue
self.timeout = 0.5 * descriptor.milliseconds / 1000.
def run(self):
self.array = None
while not self.abort.isSet():
try:
data = self.queue.get(block=True, timeout=self.timeout)
#print data.shape, data.dtype
self.buffer.Update(0, data.tostring()) # data is a numpy array
self.buffer.Play(0)
win32event.WaitForSingleObject(self.event, -1)
except Queue.Empty:
# nothing to play
# print 'empty'
pass
self.array = data
class DirectSoundRecorder(threading.Thread):
""" A thread that continously reads chunks of data from
line-in or the microphone. """
def __init__(self, descriptor, queue, abort):
"""
descriptor: BufferDescriptor
queue: Queue.Queue for writing data as np.ndarray
abort: threading.Event as abourt signal
"""
threading.Thread.__init__(self)
d = directsound.DirectSoundCaptureCreate(None, None)
sdesc = directsound.DSCBUFFERDESC()
sdesc.dwBufferBytes = descriptor.size
sdesc.lpwfxFormat = descriptor.format
_buffer = d.CreateCaptureBuffer(sdesc)
event = win32event.CreateEvent(None, 0, 0, None)
notify = _buffer.QueryInterface(directsound.IID_IDirectSoundNotify)
notify.SetNotificationPositions((directsound.DSBPN_OFFSETSTOP, event))
self.device = d
self.sdesc = sdesc
self.descriptor = descriptor
self.buffer = _buffer
self.event = event
self.notify = notify
self.abort = abort
self.queue = queue
self.timeout = 2 * descriptor.milliseconds / 1000.
def run(self):
self.array = None
while not self.abort.isSet():
self.buffer.Start(0)
win32event.WaitForSingleObject(self.event, -1)
data = self.buffer.Update(0, self.descriptor.size)
try:
array = np.frombuffer(data, dtype=self.descriptor.dtype).reshape(self.descriptor.shape)
self.queue.put( array.copy(), block=True, timeout=self.timeout )
except Queue.Full:
# we might loose data on full play-back queue
# print 'full'
pass
self.array = array
if __name__ == '__main__':
from time import sleep
desc = BufferDescriptor(milliseconds=1000)
queue = Queue.Queue(maxsize=100)
abort_event = threading.Event()
recorder = DirectSoundRecorder(desc, queue, abort_event)
player = DirectSoundPlayer(desc, queue, abort_event)
print "recording"
abort_event.clear()
recorder.start()
sleep(1)
print "delayed playback"
abort_event.clear()
player.start()
sleep(5)
abort_event.set()
player.join()
recorder.join()
import matplotlib
import matplotlib.pyplot as plt
plt.figure()
plt.plot(recorder.array[:,0],'b')
plt.plot(recorder.array[:,1],'r')
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment