Created
March 2, 2012 03:19
-
-
Save pedramamini/1955308 to your computer and use it in GitHub Desktop.
Speech Echo Proof-of-Concept
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Read the following in an interesting article today: | |
" Psychologists have known for some years that it is almost impossible to speak when your words are replayed to you | |
with a delay of a fraction of a second. " | |
Source: http://www.technologyreview.com/blog/arxiv/27620/ | |
I found it interesting and wanted to hack together a proof-of-concept for myself. Two children are spawned, one for | |
listening and the other for repeating what was heard. Data is shared between processes via shared memory queue. The | |
article calls for a 0.2 second delay between the voice echo. That value may have to be tinkered with here to compensate | |
for natural processing delay. Perhaps the best way to tune it would be to record the in/out stream and try to visually | |
measure the distance between the original and the echo. | |
Requires Python 2.6+ and PyAudio (http://people.csail.mit.edu/hubert/pyaudio/). | |
""" | |
import multiprocessing | |
import time | |
import wave | |
import sys | |
# external dependency. | |
import pyaudio | |
REPEAT_DELAY = 0.2 | |
RATE = 44100 | |
CHUNK = 1024 | |
FORMAT = pyaudio.paInt16 | |
CHANNELS = 1 | |
DURATION = 60 | |
OUTPUT_WAV = None | |
Q = multiprocessing.Queue() | |
SENTINEL = "TERMINATE" | |
######################################################################################################################## | |
def listener (stream): | |
""" | |
Listen on microphone for DURATION seconds. If OUTPUT_WAV is set then audio are queued in memory and written as a WAV | |
file to disk after DURATION expires. Audio chunks are placed on the multiprocessing Q. | |
""" | |
all = [] | |
# loop through chunks dependant on rate and recording time. | |
for i in range(0, RATE / CHUNK * DURATION): | |
# hear a chunk. | |
data = stream.read(CHUNK) | |
# repeat a chunk. | |
Q.put(data) | |
# save a chunk. | |
if OUTPUT_WAV: | |
all.append(data) | |
# let the repeater know there is nothing left to work on. | |
Q.put(SENTINEL) | |
# write audio chunks to WAV file. | |
if all: | |
wf = wave.open(OUTPUT_WAV, "wb") | |
wf.setnchannels(CHANNELS) | |
wf.setsampwidth(p.get_sample_size(FORMAT)) | |
wf.setframerate(RATE) | |
wf.writeframes("".join(all)) | |
wf.close() | |
######################################################################################################################## | |
def repeater (stream): | |
""" | |
Loop through delaying for 0.2 seconds and replaying audio chunks off of the multiprocessing queue. | |
""" | |
# keep picking up data until the sentinel is reached. | |
for data in iter(Q.get, SENTINEL): | |
# wait. | |
time.sleep(REPEAT_DELAY) | |
# echo. | |
stream.write(data, CHUNK) | |
######################################################################################################################## | |
if __name__ == "__main__": | |
children = [] | |
# open audio stream. | |
p = pyaudio.PyAudio() | |
stream = p.open(format = FORMAT, | |
channels = CHANNELS, | |
rate = RATE, | |
input = True, | |
output = True, | |
frames_per_buffer = CHUNK) | |
# queue up the kids. | |
for target in [listener, repeater]: | |
child = multiprocessing.Process(target=target, name=target.__name__, args=[stream]) | |
children.append(child) | |
# spawn them out. | |
for child in children: | |
child.start() | |
print "started child %s PID: %s" % (child.name, child.pid) | |
# wait for children to exit. | |
while 1: | |
# if not children are working, break out of the loop. | |
if True not in [child.is_alive() for child in children]: | |
break | |
time.sleep(1) | |
# close audio stream. | |
stream.close() | |
p.terminate() | |
print "done." |
This is an old PoC script that requires 2.6. I don't intend on maintaining it but if you do make updates for it to work, please do pass them back and I'll update the Gist and provide you credit.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
There is multiprocess error in Python 3.x, MSYS2 Mingw-64, Windows 10