Last active October 1, 2022 22:20
This notebook preprocesses the TIMIT dataset using MFCCs in the same way that the paper "LSTM: A Search Space Odyssey" used it.
"source": [
"from __future__ import division, absolute_import, print_function, unicode_literals\n",
"from random import shuffle\n",
"import os\n",
"import h5py\n",
"import numpy as np\n",
"import scikits.audiolab as al"
"# MFCC extraction\n",
"# By Maigo Yun Wang, 02/08/2012 adapted by Klaus Greff 2015\n",
"def melfb(p, n, fs):\n",
" \"\"\"\n",
" Return a Mel filterbank matrix as a numpy array.\n",
" Inputs:\n",
" p: number of filters in the filterbank\n",
" n: length of fft\n",
" fs: sample rate in Hz\n",
" Ref.\n",
" \"\"\"\n",
" f0 = 700.0 / fs\n",
" fn2 = int(np.floor(n/2))\n",
" lr = np.log(1 + 0.5/f0) / (p+1)\n",
" CF = fs * f0 * (np.exp(np.arange(1, p+1) * lr) - 1)\n",
" bl = n * f0 * (np.exp(np.array([0, 1, p, p+1]) * lr) - 1)\n",
" b1 = int(np.floor(bl[0])) + 1\n",
" b2 = int(np.ceil(bl[1]))\n",
" b3 = int(np.floor(bl[2]))\n",
" b4 = min(fn2, int(np.ceil(bl[3]))) - 1\n",
" pf = np.log(1 + np.arange(b1, b4+1) / f0 / n) / lr\n",
" fp = np.floor(pf)\n",
" pm = pf - fp\n",
" M = np.zeros((p, 1+fn2))\n",
" for c in range(b2-1, b4):\n",
" r = fp[c] - 1\n",
" M[int(r), c+1] += 2 * (1 - pm[c])\n",
" for c in range(b3):\n",
" r = fp[c]\n",
" M[int(r), c+1] += 2 * pm[c]\n",
" return M, CF\n",
"def dctmtx(n):\n",
" \"\"\"\n",
" Return the DCT-II matrix of order n as a numpy array.\n",
" \"\"\"\n",
" x,y = np.meshgrid(range(n), range(n))\n",
" D = np.sqrt(2.0/n) * np.cos(np.pi * (2*x+1) * y / (2*n))\n",
" D[0] /= np.sqrt(2)\n",
" return D\n",
"def extract(x):\n",
" \"\"\"\n",
" Extract MFCC coefficients of the sound x in numpy array format.\n",
" \"\"\"\n",
" FS = 16000 # Sampling rate\n",
" FRAME_LEN = int(0.025 * FS) # Frame length\n",
" FRAME_SHIFT = int(0.01 * FS) # Frame shift\n",
" FFT_SIZE = 2048 # How many points for FFT\n",
" WINDOW = np.hamming(FRAME_LEN) # Window function\n",
" PRE_EMPH = 0.97 # Pre-emphasis factor\n",
" BANDS = 40 # Number of Mel filters\n",
" COEFS = 13 # Number of Mel cepstra coefficients to keep\n",
" POWER_SPECTRUM_FLOOR = 1e-100 # Flooring for the power to avoid log(0)\n",
" M, CF = melfb(BANDS, FFT_SIZE, FS) # The Mel filterbank matrix and the center frequencies of each band\n",
" D = dctmtx(BANDS)[0:COEFS] # The DCT matrix. Change the index to [0:COEFS] if you want to keep the 0-th coefficient\n",
" invD = np.linalg.inv(dctmtx(BANDS))[:, 0:COEFS] # The inverse DCT matrix. Change the index to [0:COEFS] if you want to keep the 0-th \n",
" \n",
" if x.ndim > 1:\n",
" print(\"INFO: Input signal has more than 1 channel; the channels will be averaged.\")\n",
" x = mean(x, axis=1)\n",
" frames = int((len(x) - FRAME_LEN) / FRAME_SHIFT + 1)\n",
" feature = []\n",
" for f in range(frames):\n",
" # Windowing\n",
" frame = x[f * FRAME_SHIFT : f * FRAME_SHIFT + FRAME_LEN] * WINDOW\n",
" # Pre-emphasis\n",
" frame[1:] -= frame[:-1] * PRE_EMPH\n",
" # Power spectrum\n",
" X = np.abs(np.fft.fft(frame, FFT_SIZE)[:FFT_SIZE/2+1]) ** 2\n",
" # Mel filtering, logarithm, DCT\n",
" X =, np.log(,X)))\n",
" feature.append(X)\n",
" feature = np.row_stack(feature)\n",
" return feature"
"TIMIT_DIR = '../timit'\n",
"#filename = 'timit.h5'\n",
"# no transformation\n",
"#DTYPE = np.float32\n",
"#extractor = lambda x: x.astype(DTYPE).reshape(-1, 1)\n",
"#preprocessing_description = \"Only minimal preprocessing (normalizing to zero mean and unit standard deviation).\"\n",
"# mfcc + 1st and 2nd deriv\n",
"filename = 'timit_mfcc.h5'\n",
"extractor = extract\n",
"DTYPE = np.float64\n",
"preprocessing_description = \"\"\"Extracted 12 MFCCs coefficients + energy with window size of 25ms and 10ms step. \n",
"Used hamming window and a pre-emphasis coefficient of 0.97.\n",
"Also included 1st and 2nd time-derivative of the signal for a total of 39 feature dimensions.\"\"\""
"phones = ['aa', 'ae', 'ah', 'ao', 'aw', 'ax', 'ax-h', 'axr', 'ay', 'b', 'bcl',\n",
" 'ch', 'd', 'dcl', 'dh', 'dx', 'eh', 'el', 'em', 'en', 'eng', 'epi',\n",
" 'er', 'ey', 'f', 'g', 'gcl', 'h#', 'hh', 'hv', 'ih', 'ix', 'iy',\n",
" 'jh', 'k', 'kcl', 'l', 'm', 'n', 'ng', 'nx', 'ow', 'oy', 'p', 'pau',\n",
" 'pcl', 'q', 'r', 's', 'sh', 't', 'tcl', 'th', 'uh', 'uw', 'ux', 'v',\n",
" 'w', 'y', 'z', 'zh']\n",
"silence_label = phones.index('h#')\n",
"reduce_phones = {p: p for p in phones if p != 'q'} # discard q\n",
" 'ae': 'aa',\n",
" 'ax': 'ah', 'ax-h': 'ah',\n",
" 'axr': 'er',\n",
" 'hv': 'hh',\n",
" 'ix': 'ih',\n",
" 'el': 'l',\n",
" 'em': 'm',\n",
" 'en': 'n', 'nx': 'n',\n",
" 'eng': 'ng',\n",
" 'zh': 'sh',\n",
" 'pcl': 'h#', 'tcl': 'h#', 'kcl': 'h#', 'bcl': 'h#', 'dcl': 'h#', 'gcl': 'h#', 'pau': 'h#', 'epi': 'h#',\n",
" 'ux': 'uw'\n",
"class TimitSample(object):\n",
" @classmethod\n",
" def create(cls, directory, name):\n",
" f = os.path.join(directory, name.split('.')[0])\n",
" f = f.split('/')[-4:]\n",
" sample = cls(f[0], f[1], f[2][0], f[2][1:], f[3])\n",
" return sample\n",
" def __init__(self, usage, dialect, sex, speaker_id, sentence_id,\n",
" start=None, stop=None):\n",
" self.usage = usage\n",
" self.dialect = dialect\n",
" = sex\n",
" self.speaker_id = speaker_id\n",
" self.sentence_id = sentence_id\n",
" self.start = start\n",
" self.stop = stop\n",
" def _get_path(self, fileending):\n",
" if not fileending.startswith('.'):\n",
" fileending = '.' + fileending\n",
" return os.path.join(TIMIT_DIR, self.usage, self.dialect, +\n",
" self.speaker_id, self.sentence_id + fileending)\n",
" def get_sentence(self):\n",
" filename = self._get_path('txt')\n",
" with file(filename, 'r') as f:\n",
" content =\n",
" start, stop, sentence = content.split(' ', 2)\n",
" return int(start), int(stop), sentence.strip()\n",
" def get_words(self):\n",
" filename = self._get_path('wrd')\n",
" with file(filename, 'r') as f:\n",
" content = f.readlines()\n",
" wordlist = [c.strip().split(' ', 2) for c in content]\n",
" return [(int(start), int(stop), word)\n",
" for start, stop, word in wordlist\n",
" if (self.start is None or int(start) >= self.start) and\n",
" (self.stop is None or int(stop) <= self.stop)]\n",
" def get_phones(self):\n",
" filename = self._get_path('phn')\n",
" with file(filename, 'r') as f:\n",
" content = f.readlines()\n",
" phone_list = [c.strip().split(' ', 2) for c in content]\n",
" return [(int(start), int(stop), phone, phones.index(phone))\n",
" for start, stop, phone in phone_list\n",
" if (self.start is None or int(start) >= self.start) and\n",
" (self.stop is None or int(stop) <= self.stop)]\n",
" def get_audio_data(self):\n",
" filename = os.path.join(TIMIT_DIR, self.usage, self.dialect,\n",
" + self.speaker_id,\n",
" self.sentence_id + '.wav')\n",
" f = al.Sndfile(filename, 'r')\n",
" data = f.read_frames(f.nframes, dtype=np.float64)\n",
" return data[self.start:self.stop]\n",
" def get_labels(self, frame_size=1, frame_shift=1):\n",
" phones = self.get_phones()\n",
" begin = self.start if self.start else 0\n",
" p_extended = [silence_label] * (phones[0][0] - begin)\n",
" for p in phones:\n",
" p_extended += [p[3]] * (int(p[1]) - int(p[0]))\n",
" end = phones[-1][1]\n",
" windows = zip(range(0, end - begin - frame_size + 1, frame_shift),\n",
" range(frame_size, end - begin + 1, frame_shift))\n",
" labels = [np.bincount(p_extended[w[0]:w[1]]).argmax() for w in windows]\n",
" return np.array(labels, dtype=np.byte)\n",
" def get_features(self, extractor, frame_size=1, frame_shift=1, derivatives=0):\n",
" d = self.get_audio_data()\n",
" features = extractor(d)\n",
" feature_derivs = [features]\n",
" for i in range(derivatives):\n",
" feature_derivs.append(np.gradient(feature_derivs[-1])[0])\n",
" all_features = np.hstack(feature_derivs)\n",
" labels = self.get_labels(frame_size, frame_shift)\n",
" return all_features, labels\n",
" def __unicode__(self):\n",
" return '<TimitSample ' + '/'.join([self.usage, self.dialect,\n",
" + self.speaker_id,\n",
" self.sentence_id]) + '>'\n"
"def read_all_samples():\n",
" samples = []\n",
" for dirname, dirnames, filenames in os.walk(TIMIT_DIR):\n",
" samples += [TimitSample.create(dirname, n)\n",
" for n in filenames if n.endswith('.wav')]\n",
" return samples\n",
"def filter_samples(samples, usage=None, dialect=None, sex=None, speaker_id=None,\n",
" sentence_id=None):\n",
" def match(s):\n",
" return (usage is None or s.usage == usage) and \\\n",
" (dialect is None or s.dialect == dialect) and \\\n",
" (sex is None or == sex) and \\\n",
" (speaker_id is None or s.speaker_id == speaker_id) and \\\n",
" (sentence_id is None or s.sentence_id == sentence_id)\n",
" return [s for s in samples if match(s)]\n"
"def get_features_and_labels_for(samples):\n",
" ds_list = [s.get_features(extractor, derivatives=derivatives, frame_size=frame_size, frame_shift=frame_shift)\n",
" for s in samples]\n",
" \n",
" maxlen = max(f.shape[0] for f, l in ds_list)\n",
" padded_features = []\n",
" padded_labels = []\n",
" masks = []\n",
" for f, l in ds_list:\n",
" pad_length_f = maxlen - f.shape[0]\n",
" pad_length_l = maxlen - l.shape[0]\n",
" mask = np.ones_like(l)\n",
" padded_features.append(np.vstack((f, np.zeros((pad_length_f, f.shape[1]), dtype=DTYPE))))\n",
" padded_labels.append(np.hstack((l, np.ones(pad_length_l, dtype=DTYPE) * silence_label)))\n",
" masks.append(np.hstack((mask, np.zeros(pad_length_l, dtype=DTYPE))))\n",
" features = np.dstack(padded_features).swapaxes(1, 2)\n",
" labels = np.vstack(padded_labels).T.reshape(maxlen, -1, 1)\n",
" masks = np.vstack(masks).T.reshape(maxlen, -1, 1)\n",
" return features, labels, masks"
"def get_padded_labels(samples, reduced=False):\n",
" if not reduced:\n",
" L = [zip(*l.get_phones())[3] for l in samples]\n",
" else:\n",
" L_tmp = [zip(*l.get_phones())[2] for l in samples]\n",
" L = [[reduced_phones.index(reduce_phones[p]) for p in l if p != 'q'] for l in L_tmp]\n",
" \n",
" L_len = max([len(l) for l in L])\n",
" L_padded = -np.ones([L_len, len(L), 1], dtype=np.byte)\n",
" for i, l in enumerate(L):\n",
" L_padded[:len(l), i, 0] = l\n",
" return L_padded"
"def get_means(input_data, mask=None):\n",
" \"\"\"\n",
" Get the mean values for every feature in the batch of sequences X by\n",
" considering only masked-in entries.\n",
" @param input_data: Batch of sequences. shape = (time, sample, feature)\n",
" @param mask: Optional mask for the sequences. shape = (time, sample, 1)\n",
" @return: mean value for each feature. shape = (features, )\n",
" \"\"\"\n",
" if mask is not None:\n",
" return input_data[:, :, :].reshape(-1, input_data.shape[2])[\n",
" mask.flatten() == 1].mean(0)\n",
" else:\n",
" return input_data[:, :, :].mean((0, 1))\n",
"def get_stds(input_data, mask=None, channel_mask=None):\n",
" \"\"\"\n",
" Get the standard deviation for every feature in the batch of sequences X by\n",
" considering only masked-in entries.\n",
" @param input_data: Batch of sequences. shape = (time, sample, feature)\n",
" @param mask: Optional mask for the sequences. shape = (time, sample, 1)\n",
" @return: standard deviation of each feature. shape = (features, )\n",
" \"\"\"\n",
" if mask is not None:\n",
" return input_data[:, :, :].reshape(-1, input_data.shape[2])[\n",
" mask.flatten() == 1].std(0)\n",
" else:\n",
" return input_data[:, :, :].std((0, 1))\n",
"def subtract_means(input_data, means, mask=None):\n",
" \"\"\"\n",
" Subtract the means from the masked-in entries of a batch of sequences X.\n",
" This operation is performed in-place, i.e. the input_data will be modified.\n",
" @param input_data: Batch of sequences. shape = (time, sample, feature)\n",
" @param means: The means to subtract. shape = (features, )\n",
" @param mask: Optional mask for the sequences. shape = (time, sample, 1)\n",
" @param channel_mask: Optional mask for the channels. shape = (feature,)\n",
" \"\"\"\n",
" if mask is not None:\n",
" j = 0\n",
" for i in range(input_data.shape[2]):\n",
" input_data[:, :, i][mask[:, :, 0] == 1] -= means[j]\n",
" j += 1\n",
" else:\n",
" input_data[:, :, :] -= means\n",
"def divide_by_stds(input_data, stds, mask=None):\n",
" \"\"\"\n",
" Divide masked-in entries of input_data by the stds.\n",
" @param input_data: Batch of sequences. shape = (time, sample, feature)\n",
" @param stds: The standard deviations for every feature. shape = (features, )\n",
" @param mask: Optional mask for the sequences. shape = (time, sample, 1)\n",
" \"\"\"\n",
" if mask is not None:\n",
" j = 0\n",
" for i in range(input_data.shape[2]):\n",
" input_data[:, :, i][mask[:, :, 0] == 1] /= stds[j]\n",
" j += 1\n",
" else:\n",
" input_data[:, :, :] /= stds\n"
"all_samples = read_all_samples()"
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"samples = all_samples[:10]"
"with h5py.File(filename, 'w') as f:\n",
" orig = f.create_group('original')\n",
" orig.attrs['description'] = \"\"\"\n",
" TIMIT\n",
" =====\n",
" \n",
" This is the original TIMIT dataset.\n",
" \n",
" Preprocessing\n",
" -------------\n",
" {}\n",
" \n",
" Content\n",
" -------\n",
" default: All audio data padded to be of equal length\n",
" targets: Phone index for each frame (same shape as default)\n",
" masks: Binary array indicating for each frame whether it is part of a sequence (1) or just padding (0)\n",
" labels: Integer array with all the phone indices for a labelling task (not framewise). Padded with -1\n",
" names: list of filenames in the original dataset for each sample\n",
" \"\"\".format(preprocessing_description)\n",
" "
"execution_count": 18,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"means = get_means(X, M)\n",
"subtract_means(X, means, M)\n",
"stds = get_stds(X, M)\n",
"divide_by_stds(X, stds, M)"
"source": [
"timit_train_names = [train_samples[i]._get_path(\".txt\").encode() for i in shuffling]"
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"with h5py.File(filename, 'a') as f:\n",
" orig = f['original']\n",
" train = orig.create_group('training')\n",
" train.create_dataset('default', data=X, compression='gzip', chunks=(X.shape[0], 1, X.shape[2]))\n",
" train.create_dataset('targets', data=T, compression='gzip', chunks=(T.shape[0], 1, T.shape[2]))\n",
" train.create_dataset('masks', data=M, compression='gzip', chunks=(M.shape[0], 1, M.shape[2]))\n",
" train.create_dataset('labels', data=L)\n",
" train.create_dataset('names', data=np.array(timit_train_names))"
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"subtract_means(X_test, means, M_test)\n",
"divide_by_stds(X_test, stds, M_test)"
"metadata": {},
"source": [
"# The reduced Timit Dataset\n",
"see Phd Thesis of Andrew K. Halberstadt 1998"
