Skip to content

Instantly share code, notes, and snippets.

@avaitla
Created March 21, 2013 00:32
Show Gist options
  • Save avaitla/5209781 to your computer and use it in GitHub Desktop.
Save avaitla/5209781 to your computer and use it in GitHub Desktop.
Here is a simple setup that does command recognition and training. Depends on pyaudio, numpy, scipy. Motivated here http://xa.yimg.com/kq/groups/24321415/1523383180/name/Speech_Recognition_seminar.pdf MFCC.py is the mfcc extraction step. audio.py is the main classification system. Main.py is what you want to run on the command line, it does trai…
import MFCC, numpy
import scipy.spatial.distance as dist
THRESHOLD = 12
class AudioClassifier ():
def __init__ (self, params):
self.params = params
def Classify (self, sample, verbose = True):
length = len (sample)
features = MFCC.extract (numpy.frombuffer (sample, numpy.int16))
gestures = {}
for gesture in self.params:
d = []
for tsample in self.params[gesture]:
total_distance = 0
smpl_length = len(tsample)
if(numpy.abs(length - smpl_length) <= 0):
continue
for i in range (min (len (features), len (tsample))):
total_distance += dist.cityblock(features[i], tsample[i])
d.append (total_distance/float (i))
score = numpy.min(d)
gestures[gesture] = score
if(verbose):
print "Gesture %s: %f" % (gesture, score)
try:
if (score < minimum):
minimum = score
lowest = gesture
except:
minimum = score
lowest = gesture
if verbose:
print lowest, minimum
if(minimum < THRESHOLD):
return lowest
else:
return None
def GenerateParams (gestures, verbose = True):
params = {}
for gesture in gestures:
if(verbose):
print "Processing " + gesture
l = []
for sample in gestures[gesture]:
l.append (MFCC.extract (numpy.frombuffer (sample, numpy.int16)))
params[gesture] = l
return params
#!/usr/bin/python
import pyaudio
import wave
import audioop
import cPickle
import audio
import pdb
import os
import sys
import socket
from network import HOST, PORT
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 16000
RECORD_SECONDS = 5
THRESHOLD = 1000
try:
username = sys.argv[1]
except:
username = "default"
try:
method = sys.argv[2]
except:
method = "default"
verbose = True
p = pyaudio.PyAudio()
def StreamBuild(channels):
return p.open(format=FORMAT, channels = CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
stream = ""
try:
stream = StreamBuild(CHANNELS)
except IOError as e:
print "Found Exception: " + str(e) +". Trying 1 Channel";
CHANNELS = 1
stream = StreamBuild(CHANNELS)
out_stream = p.open(format = FORMAT, channels = CHANNELS, rate = RATE,
output = True)
def main (user="default", method="default"):
global db, username
try:
db = cPickle.load (open ("database", "rb"))
except:
db = {}
if (method == "live"):
run_classify (user, False)
if (user == "default"):
username = raw_input ("Username: ")
if (not db.has_key (username)):
db[username] = {"gestures":{}, "trained":False, "no_class_audio" : []}
else:
username = user
if (not db.has_key (username)):
db[username] = {"gestures":{}, "trained":False, "no_class_audio" : []}
gestlist = ""
for gesture in db[username]["gestures"].keys():
gestlist += (gesture + ", ")
print ("Welcome %s, you have loaded the following gestures: %s" %
(username, gestlist.strip(", ")))
if db[username]["trained"]:
print "The system has been trained since you last recorded."
else:
print "The system has not been trained since you last recorded."
if (method == "default"):
print "Menu: \t1) Record Gestures"
print "\t2) Train System"
print "\t3) Run Classifier"
print "\t4) Listen to Gestures"
print "\t5) Save Audio to File"
print "\t6) View Stats for User"
print "\t7) Run Testing Module"
print "\t8) Quit"
method = input ("Choose option: ")
print ""
ret = 1
if (method == 1):
printv ("Starting recording system")
run_record (username)
printv ("Finished recording system")
elif (method == 2):
printv ("Starting training system")
run_train (username)
printv ("Finished training system")
elif (method == 3):
printv ("Starting classification system")
run_classify (username)
printv ("Finished classification system")
elif (method == 4):
printv ("Starting playback system")
run_play (username)
printv ("Finished playback system")
elif (method == 5):
printv("Saving Audio To Files")
run_save_audio(username)
printv("Finished Saving Audio")
elif(method == 6):
printv("Viewing User Info")
run_view_avail_gestures(username)
printv("Finished Viewing User Info")
elif(method == 7):
printv("Starting Test Sequence")
run_test_sequence(username)
printv("Ending Test Sequence")
else:
ret = 0
f = open ("database", "wb")
cPickle.dump (db, f)
f.close ()
print "------------------------------------------------------------"
return (ret)
def run_record (username):
global db
gesturename = raw_input ("Gesture name: ")
if (gesturename[0] == '-'):
db[username]["gestures"].pop(gesturename[1:])
return
if (not db[username]["gestures"].has_key (gesturename)):
db[username]["gestures"][gesturename] = []
db[username]["trained"] = False
Run_On_Every_Frame(lambda frames : db[username]["gestures"][gesturename].append(frames))
for gesture in db[username]["gestures"][gesturename]:
playv(gesture)
def run_train (username):
params = audio.GenerateParams(db[username]["gestures"], verbose)
db[username]["params"] = params
db[username]["trained"] = True
def run_classify (username, verbose = True):
frames = ""
classifier = audio.AudioClassifier (db[username]["params"])
Run_On_Every_Frame(lambda frames : SendString (classifier.Classify(frames, verbose)))
def run_play (username):
gesturename = raw_input ("Gesture name: ")
if (not db[username]["gestures"].has_key (gesturename)):
print "No such gesture found for user: "+username
return
for gesture in db[username]["gestures"][gesturename]:
out_stream.write (gesture)
return
def run_save_audio(username):
if(os.path.isdir(username)):
ret = raw_input("Directory %s Exists. Would you like to delete it? (y,n)" % username)
if(ret == "n"): return
import shutil
shutil.rmtree(username)
os.mkdir(username)
import scipy.io.wavfile
import numpy
for gesture in db[username]["gestures"]:
for i, audio in enumerate(db[username]["gestures"][gesture]):
path = os.path.join(username, gesture + "_" + str(i) + ".wav")
scipy.io.wavfile.write(path, RATE, numpy.frombuffer(audio, numpy.int16))
def run_view_avail_gestures(username):
for i,gesture in enumerate(db[username]["gestures"]):
print "\t" + str(i) + " : " + gesture
print ""
def run_test_sequence(username):
val = raw_input("Record unclassifiable audio samples? Currently have %s samples. (y,n): "
% len(db[username]["no_class_audio"]))
if(val == "y"):
Run_On_Every_Frame(lambda frames : db[username]["no_class_audio"].append(frames))
trainData = {}
testData = {}
import random
for gesture in db[username]["gestures"]:
train = True
for smpl in db[username]["gestures"][gesture]:
if(train):
trainData.setdefault(gesture, []).append(smpl)
else:
testData.setdefault(gesture, []).append(smpl)
train = not train
aClassifier = audio.AudioClassifier(audio.GenerateParams(trainData, False))
correctPredictions = {}
incorrectPredictions = {}
print ""
for gesture in testData:
for i, sample in enumerate(testData[gesture]):
aClassification = aClassifier.Classify(sample, False)
# Incorrect Classification
if(aClassification != gesture):
printv("FAIL : Incorrectly Classified %s as being %s" % (gesture, aClassification))
playv(sample)
incorrectPredictions.setdefault(gesture, {}).setdefault(aClassification, []).append(i)
#Correct Classification
else:
printv("SUCCESS : Correctly Classified %s as being %s" % (gesture, gesture))
correctPredictions.setdefault(gesture, []).append(i)
correctNoClass = []
incorrectNoClass = {}
for i, no_class_audio in enumerate(db[username]["no_class_audio"]):
aClassification = aClassifier.Classify(no_class_audio, False)
# Incorrect No Class Classification
if(aClassification != None):
incorrectNoClass.setdefault(aClassification, []).append(i)
printv("FAIL : Gave No Class Audio Label %s" % aClassification)
playv(no_class_audio)
# Correct No Class Assignment
else:
printv("SUCCESS : Gave Correct No Class Assignment")
correctNoClass.append(i)
view_test_results(correctPredictions, incorrectPredictions, correctNoClass,
incorrectNoClass, testData, db[username]["no_class_audio"])
def view_test_results(correctPredictions, incorrectPredictions, correctNoClass,
incorrectNoClass, testData, noClassTests):
def printPercentages(numCorrect, total):
if(total == 0):
print "No Samples\n"
return
percentCorrect = int(100 * float(numCorrect) / total)
percentIncorrect = 100 - percentCorrect
printv("\tSUCCESS (%s%%)\t: %s" % (str(percentCorrect), "*" * (percentCorrect / 10)))
printv("\tFAIL (%s%%)\t: %s" % (str(percentIncorrect), "*" * (percentIncorrect / 10)))
total_correct = 0
total = 0
for item in testData.keys():
printv("\nGesture %s" % item)
total_correct = 0
if(item in correctPredictions):
total_correct = len(correctPredictions[item])
total = len(testData[item])
printPercentages(total_correct, total)
for key in testData.keys():
if(key in correctPredictions):
total_correct += len(correctPredictions[key])
total += len(testData[key])
printv("\n\nTotals for Classification with Labels")
printPercentages(total_correct, total)
total_correct = len(correctNoClass)
total = float(len(noClassTests))
printv("\nTotals for Classification with No Labels")
printPercentages(total_correct, total)
printv("")
def Run_On_Every_Frame(execute):
frames = ""
try:
while (True):
data = stream.read (CHUNK)
amplitude = audioop.rms (data, 2)
if (amplitude >= THRESHOLD):
#printv("Amplitude: " + str(amplitude))
frames += data
elif (len (frames) > 0):
execute(frames)
frames = ""
except KeyboardInterrupt:
return
def SendString (string):
if (string != None):
client_socket = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
print username+"_"+string
client_socket.sendto (username+"_"+string, (HOST, PORT))
client_socket.close ()
def printv (string):
if (verbose):
print string
def playv(audio):
if (verbose):
out_stream.write(audio)
try:
ret = 1
while(ret != 0):
ret = main (username, method)
except KeyboardInterrupt:
raise
finally:
try:
out_stream.stop_stream()
out_stream.close()
stream.stop_stream()
stream.close()
p.terminate()
except Exception as e:
print str(e)
###############################################################################
# Module for MFCC extraction
# By Maigo Yun Wang, 02/08/2012
###############################################################################
# Quick tutorial:
# import MFCC
# x = ... # x is a wave signal saved in a 1-D numpy array
# mfcc = MFCC.extract(x)
# # mfcc is a 2-D numpy array, where each row is the
# # MFCC of a frame in x
# mfcc = MFCC.extract(x, show = True)
# # This will also plot the MFCC and the spectrogram
# # reconstructed from MFCC by inverse DCT
###############################################################################
# Feel free to customize the parameters on Lines 67 - 79.
###############################################################################
#
# Modded to perform normalization before extraction
#
################################################################################
from numpy import *
from numpy.linalg import *
from matplotlib.pyplot import *
def hamming(n):
"""
Generate a hamming window of n points as a numpy array.
"""
return 0.54 - 0.46 * cos(2 * pi / n * (arange(n) + 0.5))
def melfb(p, n, fs):
"""
Return a Mel filterbank matrix as a numpy array.
Inputs:
p: number of filters in the filterbank
n: length of fft
fs: sample rate in Hz
Ref. http://www.ifp.illinois.edu/~minhdo/teaching/speaker_recognition/code/melfb.m
"""
f0 = 700.0 / fs
fn2 = int(floor(n/2))
lr = log(1 + 0.5/f0) / (p+1)
CF = fs * f0 * (exp(arange(1, p+1) * lr) - 1)
bl = n * f0 * (exp(array([0, 1, p, p+1]) * lr) - 1)
b1 = int(floor(bl[0])) + 1
b2 = int(ceil(bl[1]))
b3 = int(floor(bl[2]))
b4 = min(fn2, int(ceil(bl[3]))) - 1
pf = log(1 + arange(b1,b4+1) / f0 / n) / lr
fp = floor(pf)
pm = pf - fp
M = zeros((p, 1+fn2))
for c in range(b2-1,b4):
r = fp[c] - 1
M[r,c+1] += 2 * (1 - pm[c])
for c in range(b3):
r = fp[c]
M[r,c+1] += 2 * pm[c]
return M, CF
def dctmtx(n):
"""
Return the DCT-II matrix of order n as a numpy array.
"""
x,y = meshgrid(range(n), range(n))
D = sqrt(2.0/n) * cos(pi * (2*x+1) * y / (2*n))
D[0] /= sqrt(2)
return D
FS = 16000 # Sampling rate
FRAME_LEN = int(0.02 * FS) # Frame length
FRAME_SHIFT = int(0.01 * FS) # Frame shift
FFT_SIZE = 2048 # How many points for FFT
WINDOW = hamming(FRAME_LEN) # Window function
PRE_EMPH = 0.95 # Pre-emphasis factor
BANDS = 40 # Number of Mel filters
COEFS = 13 # Number of Mel cepstra coefficients to keep
POWER_SPECTRUM_FLOOR = 1e-100 # Flooring for the power to avoid log(0)
M, CF = melfb(BANDS, FFT_SIZE, FS) # The Mel filterbank matrix and the center frequencies of each band
D = dctmtx(BANDS)[1:COEFS+1] # The DCT matrix. Change the index to [0:COEFS] if you want to keep the 0-th coefficient
invD = inv(dctmtx(BANDS))[:,1:COEFS+1] # The inverse DCT matrix. Change the index to [0:COEFS] if you want to keep the 0-th coefficient
def extract(x, show = False):
"""
Extract MFCC coefficients of the sound x in numpy array format.
"""
if x.ndim > 1:
print "INFO: Input signal has more than 1 channel; the channels will be averaged."
x = mean(x, axis=1)
# Normalize the Sequence First
#total = 0.0
#for i in x: total += i**2
#total = sqrt(total / len(x))
#x = x / total
frames = (len(x) - FRAME_LEN) / FRAME_SHIFT + 1
feature = []
for f in range(frames):
# Windowing
frame = x[f * FRAME_SHIFT : f * FRAME_SHIFT + FRAME_LEN] * WINDOW
# Pre-emphasis
frame[1:] -= frame[:-1] * PRE_EMPH
# Power spectrum
X = abs(fft.fft(frame, FFT_SIZE)[:FFT_SIZE/2+1]) ** 2
X[X < POWER_SPECTRUM_FLOOR] = POWER_SPECTRUM_FLOOR # Avoid zero
# Mel filtering, logarithm, DCT
X = dot(D, log(dot(M,X)))
feature.append(X)
feature = row_stack(feature)
# Show the MFCC spectrum before normalization
if show:
figure().show()
subplot(2,1,2)
show_MFCC_spectrum(feature)
# Mean & variance normalization
if feature.shape[0] > 1:
mu = mean(feature, axis=0)
sigma = std(feature, axis=0)
feature = (feature - mu) / sigma
# Show the MFCC
subplot(2,1,1)
show_MFCC(feature)
draw()
return feature
def show_MFCC(mfcc):
"""
Show the MFCC as an image.
"""
imshow(mfcc.T, aspect="auto", interpolation="none")
title("MFCC features")
xlabel("Frame")
ylabel("Dimension")
def show_MFCC_spectrum(mfcc):
"""
Show the spectrum reconstructed from MFCC as an image.
"""
imshow(dot(invD, mfcc.T), aspect="auto", interpolation="none", origin="lower")
title("MFCC spectrum")
xlabel("Frame")
ylabel("Band")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment