Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Audio tools for numpy/python. Constant work in progress.
raise ValueError("DEPRECATED/FROZEN - see https://github.com/kastnerkyle/tools for the latest")
# License: BSD 3-clause
# Authors: Kyle Kastner
# Harvest, Cheaptrick, D4C, WORLD routines based on MATLAB code from M. Morise
# http://ml.cs.yamanashi.ac.jp/world/english/
# MGC code based on r9y9 (Ryuichi Yamamoto) MelGeneralizedCepstrums.jl
# Pieces also adapted from SPTK
from __future__ import division
import numpy as np
import scipy as sp
from numpy.lib.stride_tricks import as_strided
import scipy.signal as sg
from scipy.interpolate import interp1d
import wave
from scipy.cluster.vq import vq
from scipy import linalg, fftpack
from numpy.testing import assert_almost_equal
from scipy.linalg import svd
from scipy.io import wavfile
from scipy.signal import firwin
import zipfile
import tarfile
import os
import copy
import multiprocessing
from multiprocessing import Pool
import functools
import time
try:
import urllib.request as urllib # for backwards compatibility
except ImportError:
import urllib2 as urllib
def download(url, server_fname, local_fname=None, progress_update_percentage=5,
bypass_certificate_check=False):
"""
An internet download utility modified from
http://stackoverflow.com/questions/22676/
how-do-i-download-a-file-over-http-using-python/22776#22776
"""
if bypass_certificate_check:
import ssl
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
u = urllib.urlopen(url, context=ctx)
else:
u = urllib.urlopen(url)
if local_fname is None:
local_fname = server_fname
full_path = local_fname
meta = u.info()
with open(full_path, 'wb') as f:
try:
file_size = int(meta.get("Content-Length"))
except TypeError:
print("WARNING: Cannot get file size, displaying bytes instead!")
file_size = 100
print("Downloading: %s Bytes: %s" % (server_fname, file_size))
file_size_dl = 0
block_sz = int(1E7)
p = 0
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
f.write(buffer)
if (file_size_dl * 100. / file_size) > p:
status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl *
100. / file_size)
print(status)
p += progress_update_percentage
def fetch_sample_speech_tapestry():
url = "https://www.dropbox.com/s/qte66a7haqspq2g/tapestry.wav?dl=1"
wav_path = "tapestry.wav"
if not os.path.exists(wav_path):
download(url, wav_path)
fs, d = wavfile.read(wav_path)
d = d.astype('float32') / (2 ** 15)
# file is stereo? - just choose one channel
return fs, d
def fetch_sample_file(wav_path):
if not os.path.exists(wav_path):
raise ValueError("Unable to find file at path %s" % wav_path)
fs, d = wavfile.read(wav_path)
d = d.astype('float32') / (2 ** 15)
# file is stereo - just choose one channel
if len(d.shape) > 1:
d = d[:, 0]
return fs, d
def fetch_sample_music():
url = "http://www.music.helsinki.fi/tmt/opetus/uusmedia/esim/"
url += "a2002011001-e02-16kHz.wav"
wav_path = "test.wav"
if not os.path.exists(wav_path):
download(url, wav_path)
fs, d = wavfile.read(wav_path)
d = d.astype('float32') / (2 ** 15)
# file is stereo - just choose one channel
d = d[:, 0]
return fs, d
def fetch_sample_speech_fruit(n_samples=None):
url = 'https://dl.dropboxusercontent.com/u/15378192/audio.tar.gz'
wav_path = "audio.tar.gz"
if not os.path.exists(wav_path):
download(url, wav_path)
tf = tarfile.open(wav_path)
wav_names = [fname for fname in tf.getnames()
if ".wav" in fname.split(os.sep)[-1]]
speech = []
print("Loading speech files...")
for wav_name in wav_names[:n_samples]:
f = tf.extractfile(wav_name)
fs, d = wavfile.read(f)
d = d.astype('float32') / (2 ** 15)
speech.append(d)
return fs, speech
def fetch_sample_speech_eustace(n_samples=None):
"""
http://www.cstr.ed.ac.uk/projects/eustace/download.html
"""
# data
url = "http://www.cstr.ed.ac.uk/projects/eustace/down/eustace_wav.zip"
wav_path = "eustace_wav.zip"
if not os.path.exists(wav_path):
download(url, wav_path)
# labels
url = "http://www.cstr.ed.ac.uk/projects/eustace/down/eustace_labels.zip"
labels_path = "eustace_labels.zip"
if not os.path.exists(labels_path):
download(url, labels_path)
# Read wavfiles
# 16 kHz wav
zf = zipfile.ZipFile(wav_path, 'r')
wav_names = [fname for fname in zf.namelist()
if ".wav" in fname.split(os.sep)[-1]]
fs = 16000
speech = []
print("Loading speech files...")
for wav_name in wav_names[:n_samples]:
wav_str = zf.read(wav_name)
d = np.frombuffer(wav_str, dtype=np.int16)
d = d.astype('float32') / (2 ** 15)
speech.append(d)
zf = zipfile.ZipFile(labels_path, 'r')
label_names = [fname for fname in zf.namelist()
if ".lab" in fname.split(os.sep)[-1]]
labels = []
print("Loading label files...")
for label_name in label_names[:n_samples]:
label_file_str = zf.read(label_name)
labels.append(label_file_str)
return fs, speech
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = fftpack.rfft
cut = -1
else:
local_fft = fftpack.fft
cut = None
if compute_onesided:
cut = fftsize // 2 + 1
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
def istft(X, fftsize=128, step="half", wsola=False, mean_normalize=True,
real=False, compute_onesided=True):
"""
Compute ISTFT for STFT transformed X
"""
if real:
local_ifft = fftpack.irfft
X_pad = np.zeros((X.shape[0], X.shape[1] + 1)) + 0j
X_pad[:, :-1] = X
X = X_pad
else:
local_ifft = fftpack.ifft
if compute_onesided:
X_pad = np.zeros((X.shape[0], 2 * X.shape[1])) + 0j
X_pad[:, :fftsize // 2 + 1] = X
X_pad[:, fftsize // 2 + 1:] = 0
X = X_pad
X = local_ifft(X).astype("float64")
if step == "half":
X = invert_halfoverlap(X)
else:
X = overlap_add(X, step, wsola=wsola)
if mean_normalize:
X -= np.mean(X)
return X
def mdct_slow(X, dctsize=128):
M = dctsize
N = 2 * dctsize
N_0 = (M + 1) / 2
X = halfoverlap(X, N)
X = sine_window(X)
n, k = np.meshgrid(np.arange(N), np.arange(M))
# Use transpose due to "samples as rows" convention
tf = np.cos(np.pi * (n + N_0) * (k + 0.5) / M).T
return np.dot(X, tf)
def imdct_slow(X, dctsize=128):
M = dctsize
N = 2 * dctsize
N_0 = (M + 1) / 2
N_4 = N / 4
n, k = np.meshgrid(np.arange(N), np.arange(M))
# inverse *is not* transposed
tf = np.cos(np.pi * (n + N_0) * (k + 0.5) / M)
X_r = np.dot(X, tf) / N_4
X_r = sine_window(X_r)
X = invert_halfoverlap(X_r)
return X
def nsgcwin(fmin, fmax, n_bins, fs, signal_len, gamma):
"""
Nonstationary Gabor window calculation
References
----------
Velasco G. A., Holighaus N., Dorfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dorfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
# use a hanning window
# no fractional shifts
fftres = fs / signal_len
fmin = float(fmin)
fmax = float(fmax)
gamma = float(gamma)
nyq = fs / 2.
b = np.floor(n_bins * np.log2(fmax / fmin))
fbas = fmin * 2 ** (np.arange(b + 1) / float(n_bins))
Q = 2 ** (1. / n_bins) - 2 ** (-1. / n_bins)
cqtbw = Q * fbas + gamma
cqtbw = cqtbw.ravel()
maxidx = np.where(fbas + cqtbw / 2. > nyq)[0]
if len(maxidx) > 0:
# replicate bug in MATLAB version...
# or is it a feature
if sum(maxidx) == 0:
first = len(cqtbw) - 1
else:
first = maxidx[0]
fbas = fbas[:first]
cqtbw = cqtbw[:first]
minidx = np.where(fbas - cqtbw / 2. < 0)[0]
if len(minidx) > 0:
fbas = fbas[minidx[-1]+1:]
cqtbw = cqtbw[minidx[-1]+1:]
fbas_len = len(fbas)
fbas_new = np.zeros((2 * (len(fbas) + 1)))
fbas_new[1:len(fbas) + 1] = fbas
fbas = fbas_new
fbas[fbas_len + 1] = nyq
fbas[fbas_len + 2:] = fs - fbas[1:fbas_len + 1][::-1]
bw = np.zeros_like(fbas)
bw[0] = 2 * fmin
bw[1:len(cqtbw) + 1] = cqtbw
bw[len(cqtbw) + 1] = fbas[fbas_len + 2] - fbas[fbas_len]
bw[-len(cqtbw):] = cqtbw[::-1]
bw = bw / fftres
fbas = fbas / fftres
posit = np.zeros_like(fbas)
posit[:fbas_len + 2] = np.floor(fbas[:fbas_len + 2])
posit[fbas_len + 2:] = np.ceil(fbas[fbas_len + 2:])
base_shift = -posit[-1] % signal_len
shift = np.zeros_like(posit).astype("int32")
shift[1:] = (posit[1:] - posit[:-1]).astype("int32")
shift[0] = base_shift
bw = np.round(bw)
bwfac = 1
M = bw
min_win = 4
for ii in range(len(bw)):
if bw[ii] < min_win:
bw[ii] = min_win
M[ii] = bw[ii]
def _win(numel):
if numel % 2 == 0:
s1 = np.arange(0, .5, 1. / numel)
if len(s1) != numel // 2:
# edge case with small floating point numbers...
s1 = s1[:-1]
s2 = np.arange(-.5, 0, 1. / numel)
if len(s2) != numel // 2:
# edge case with small floating point numbers...
s2 = s2[:-1]
x = np.concatenate((s1, s2))
else:
s1 = np.arange(0, .5, 1. / numel)
s2 = np.arange(-.5 + .5 / numel, 0, 1. / numel)
if len(s2) != numel // 2: # assume integer truncate 27 // 2 = 13
s2 = s2[:-1]
x = np.concatenate((s1, s2))
assert len(x) == numel
g = .5 + .5 * np.cos(2 * np.pi * x)
return g
multiscale = [_win(bi) for bi in bw]
bw = bwfac * np.ceil(M / bwfac)
for kk in [0, fbas_len + 1]:
if M[kk] > M[kk + 1]:
multiscale[kk] = np.ones(M[kk]).astype(multiscale[0].dtype)
i1 = np.floor(M[kk] / 2) - np.floor(M[kk + 1] / 2)
i2 = np.floor(M[kk] / 2) + np.ceil(M[kk + 1] / 2)
# Very rarely, gets an off by 1 error? Seems to be at the end...
# for now, slice
multiscale[kk][i1:i2] = _win(M[kk + 1])
multiscale[kk] = multiscale[kk] / np.sqrt(M[kk])
return multiscale, shift, M
def nsgtf_real(X, multiscale, shift, window_lens):
"""
Nonstationary Gabor Transform for real values
References
----------
Velasco G. A., Holighaus N., Dorfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dorfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
# This will break with multchannel input
signal_len = len(X)
N = len(shift)
X_fft = np.fft.fft(X)
fill = np.sum(shift) - signal_len
if fill > 0:
X_fft_tmp = np.zeros((signal_len + shift))
X_fft_tmp[:len(X_fft)] = X_fft
X_fft = X_fft_tmp
posit = np.cumsum(shift) - shift[0]
scale_lens = np.array([len(m) for m in multiscale])
N = np.where(posit - np.floor(scale_lens) <= (signal_len + fill) / 2)[0][-1]
c = []
# c[0] is almost exact
for ii in range(N):
idx_l = np.arange(np.ceil(scale_lens[ii] / 2), scale_lens[ii])
idx_r = np.arange(np.ceil(scale_lens[ii] / 2))
idx = np.concatenate((idx_l, idx_r))
idx = idx.astype("int32")
subwin_range = posit[ii] + np.arange(-np.floor(scale_lens[ii] / 2),
np.ceil(scale_lens[ii] / 2))
win_range = subwin_range % (signal_len + fill)
win_range = win_range.astype("int32")
if window_lens[ii] < scale_lens[ii]:
raise ValueError("Not handling 'not enough channels' case")
else:
temp = np.zeros((window_lens[ii],)).astype(X_fft.dtype)
temp_idx_l = np.arange(len(temp) - np.floor(scale_lens[ii] / 2),
len(temp))
temp_idx_r = np.arange(np.ceil(scale_lens[ii] / 2))
temp_idx = np.concatenate((temp_idx_l, temp_idx_r))
temp_idx = temp_idx.astype("int32")
temp[temp_idx] = X_fft[win_range] * multiscale[ii][idx]
fs_new_bins = window_lens[ii]
fk_bins = posit[ii]
displace = fk_bins - np.floor(fk_bins / fs_new_bins) * fs_new_bins
displace = displace.astype("int32")
temp = np.roll(temp, displace)
c.append(np.fft.ifft(temp))
if 0:
# cell2mat concatenation
c = np.concatenate(c)
return c
def nsdual(multiscale, shift, window_lens):
"""
Calculation of nonstationary inverse gabor filters
References
----------
Velasco G. A., Holighaus N., Dorfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dorfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
N = len(shift)
posit = np.cumsum(shift)
seq_len = posit[-1]
posit = posit - shift[0]
diagonal = np.zeros((seq_len,))
win_range = []
for ii in range(N):
filt_len = len(multiscale[ii])
idx = np.arange(-np.floor(filt_len / 2), np.ceil(filt_len / 2))
win_range.append((posit[ii] + idx) % seq_len)
subdiag = window_lens[ii] * np.fft.fftshift(multiscale[ii]) ** 2
ind = win_range[ii].astype(np.int)
diagonal[ind] = diagonal[ind] + subdiag
dual_multiscale = multiscale
for ii in range(N):
ind = win_range[ii].astype(np.int)
dual_multiscale[ii] = np.fft.ifftshift(
np.fft.fftshift(dual_multiscale[ii]) / diagonal[ind])
return dual_multiscale
def nsgitf_real(c, c_dc, c_nyq, multiscale, shift):
"""
Nonstationary Inverse Gabor Transform on real valued signal
References
----------
Velasco G. A., Holighaus N., Dorfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dorfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
c_l = []
c_l.append(c_dc)
c_l.extend([ci for ci in c])
c_l.append(c_nyq)
posit = np.cumsum(shift)
seq_len = posit[-1]
posit -= shift[0]
out = np.zeros((seq_len,)).astype(c_l[1].dtype)
for ii in range(len(c_l)):
filt_len = len(multiscale[ii])
win_range = posit[ii] + np.arange(-np.floor(filt_len / 2),
np.ceil(filt_len / 2))
win_range = (win_range % seq_len).astype(np.int)
temp = np.fft.fft(c_l[ii]) * len(c_l[ii])
fs_new_bins = len(c_l[ii])
fk_bins = posit[ii]
displace = int(fk_bins - np.floor(fk_bins / fs_new_bins) * fs_new_bins)
temp = np.roll(temp, -displace)
l = np.arange(len(temp) - np.floor(filt_len / 2), len(temp))
r = np.arange(np.ceil(filt_len / 2))
temp_idx = (np.concatenate((l, r)) % len(temp)).astype(np.int)
temp = temp[temp_idx]
lf = np.arange(filt_len - np.floor(filt_len / 2), filt_len)
rf = np.arange(np.ceil(filt_len / 2))
filt_idx = np.concatenate((lf, rf)).astype(np.int)
m = multiscale[ii][filt_idx]
out[win_range] = out[win_range] + m * temp
nyq_bin = np.floor(seq_len / 2) + 1
out_idx = np.arange(
nyq_bin - np.abs(1 - seq_len % 2) - 1, 0, -1).astype(np.int)
out[nyq_bin:] = np.conj(out[out_idx])
t_out = np.real(np.fft.ifft(out)).astype(np.float64)
return t_out
def cqt(X, fs, n_bins=48, fmin=27.5, fmax="nyq", gamma=20):
"""
Constant Q Transform
References
----------
Velasco G. A., Holighaus N., Dorfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dorfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
if fmax == "nyq":
fmax = fs / 2.
multiscale, shift, window_lens = nsgcwin(fmin, fmax, n_bins, fs,
len(X), gamma)
fbas = fs * np.cumsum(shift[1:]) / len(X)
fbas = fbas[:len(window_lens) // 2 - 1]
bins = window_lens.shape[0] // 2 - 1
window_lens[1:bins + 1] = window_lens[bins + 2]
window_lens[bins + 2:] = window_lens[1:bins + 1][::-1]
norm = 2. * window_lens[:bins + 2] / float(len(X))
norm = np.concatenate((norm, norm[1:-1][::-1]))
multiscale = [norm[ii] * multiscale[ii] for ii in range(2 * (bins + 1))]
c = nsgtf_real(X, multiscale, shift, window_lens)
c_dc = c[0]
c_nyq = c[-1]
c_sub = c[1:-1]
c = np.vstack(c_sub)
return c, c_dc, c_nyq, multiscale, shift, window_lens
def icqt(X_cq, c_dc, c_nyq, multiscale, shift, window_lens):
"""
Inverse constant Q Transform
References
----------
Velasco G. A., Holighaus N., Dorfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dorfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
new_multiscale = nsdual(multiscale, shift, window_lens)
X = nsgitf_real(X_cq, c_dc, c_nyq, new_multiscale, shift)
return X
def rolling_mean(X, window_size):
w = 1.0 / window_size * np.ones((window_size))
return np.correlate(X, w, 'valid')
def rolling_window(X, window_size):
# for 1d data
shape = X.shape[:-1] + (X.shape[-1] - window_size + 1, window_size)
strides = X.strides + (X.strides[-1],)
return np.lib.stride_tricks.as_strided(X, shape=shape, strides=strides)
def voiced_unvoiced(X, window_size=256, window_step=128, copy=True):
"""
Voiced unvoiced detection from a raw signal
Based on code from:
https://www.clear.rice.edu/elec532/PROJECTS96/lpc/code.html
Other references:
http://www.seas.ucla.edu/spapl/code/harmfreq_MOLRT_VAD.m
Parameters
----------
X : ndarray
Raw input signal
window_size : int, optional (default=256)
The window size to use, in samples.
window_step : int, optional (default=128)
How far the window steps after each calculation, in samples.
copy : bool, optional (default=True)
Whether to make a copy of the input array or allow in place changes.
"""
X = np.array(X, copy=copy)
if len(X.shape) < 2:
X = X[None]
n_points = X.shape[1]
n_windows = n_points // window_step
# Padding
pad_sizes = [(window_size - window_step) // 2,
window_size - window_step // 2]
# TODO: Handling for odd window sizes / steps
X = np.hstack((np.zeros((X.shape[0], pad_sizes[0])), X,
np.zeros((X.shape[0], pad_sizes[1]))))
clipping_factor = 0.6
b, a = sg.butter(10, np.pi * 9 / 40)
voiced_unvoiced = np.zeros((n_windows, 1))
period = np.zeros((n_windows, 1))
for window in range(max(n_windows - 1, 1)):
XX = X.ravel()[window * window_step + np.arange(window_size)]
XX *= sg.hamming(len(XX))
XX = sg.lfilter(b, a, XX)
left_max = np.max(np.abs(XX[:len(XX) // 3]))
right_max = np.max(np.abs(XX[-len(XX) // 3:]))
clip_value = clipping_factor * np.min([left_max, right_max])
XX_clip = np.clip(XX, clip_value, -clip_value)
XX_corr = np.correlate(XX_clip, XX_clip, mode='full')
center = np.argmax(XX_corr)
right_XX_corr = XX_corr[center:]
prev_window = max([window - 1, 0])
if voiced_unvoiced[prev_window] > 0:
# Want it to be harder to turn off than turn on
strength_factor = .29
else:
strength_factor = .3
start = np.where(right_XX_corr < .3 * XX_corr[center])[0]
# 20 is hardcoded but should depend on samplerate?
try:
start = np.max([20, start[0]])
except IndexError:
start = 20
search_corr = right_XX_corr[start:]
index = np.argmax(search_corr)
second_max = search_corr[index]
if (second_max > strength_factor * XX_corr[center]):
voiced_unvoiced[window] = 1
period[window] = start + index - 1
else:
voiced_unvoiced[window] = 0
period[window] = 0
return np.array(voiced_unvoiced), np.array(period)
def lpc_analysis(X, order=8, window_step=128, window_size=2 * 128,
emphasis=0.9, voiced_start_threshold=.9,
voiced_stop_threshold=.6, truncate=False, copy=True):
"""
Extract LPC coefficients from a signal
Based on code from:
http://labrosa.ee.columbia.edu/matlab/sws/
_rParameters
----------
X : ndarray
Signals to extract LPC coefficients from
order : int, optional (default=8)
Order of the LPC coefficients. For speech, use the general rule that the
order is two times the expected number of formants plus 2.
This can be formulated as 2 + 2 * (fs // 2000). For approx. signals
with fs = 7000, this is 8 coefficients - 2 + 2 * (7000 // 2000).
window_step : int, optional (default=128)
The size (in samples) of the space between each window
window_size : int, optional (default=2 * 128)
The size of each window (in samples) to extract coefficients over
emphasis : float, optional (default=0.9)
The emphasis coefficient to use for filtering
voiced_start_threshold : float, optional (default=0.9)
Upper power threshold for estimating when speech has started
voiced_stop_threshold : float, optional (default=0.6)
Lower power threshold for estimating when speech has stopped
truncate : bool, optional (default=False)
Whether to cut the data at the last window or do zero padding.
copy : bool, optional (default=True)
Whether to copy the input X or modify in place
Returns
-------
lp_coefficients : ndarray
lp coefficients to describe the frame
per_frame_gain : ndarray
calculated gain for each frame
residual_excitation : ndarray
leftover energy which is not described by lp coefficents and gain
voiced_frames : ndarray
array of [0, 1] values which holds voiced/unvoiced decision for each
frame.
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
X = np.array(X, copy=copy)
if len(X.shape) < 2:
X = X[None]
n_points = X.shape[1]
n_windows = int(n_points // window_step)
if not truncate:
pad_sizes = [(window_size - window_step) // 2,
window_size - window_step // 2]
# TODO: Handling for odd window sizes / steps
X = np.hstack((np.zeros((X.shape[0], int(pad_sizes[0]))), X,
np.zeros((X.shape[0], int(pad_sizes[1])))))
else:
pad_sizes = [0, 0]
X = X[0, :n_windows * window_step]
lp_coefficients = np.zeros((n_windows, order + 1))
per_frame_gain = np.zeros((n_windows, 1))
residual_excitation = np.zeros(
int(((n_windows - 1) * window_step + window_size)))
# Pre-emphasis high-pass filter
X = sg.lfilter([1, -emphasis], 1, X)
# stride_tricks.as_strided?
autocorr_X = np.zeros((n_windows, int(2 * window_size - 1)))
for window in range(max(n_windows - 1, 1)):
wtws = int(window * window_step)
XX = X.ravel()[wtws + np.arange(window_size, dtype="int32")]
WXX = XX * sg.hanning(window_size)
autocorr_X[window] = np.correlate(WXX, WXX, mode='full')
center = np.argmax(autocorr_X[window])
RXX = autocorr_X[window,
np.arange(center, window_size + order, dtype="int32")]
R = linalg.toeplitz(RXX[:-1])
solved_R = linalg.pinv(R).dot(RXX[1:])
filter_coefs = np.hstack((1, -solved_R))
residual_signal = sg.lfilter(filter_coefs, 1, WXX)
gain = np.sqrt(np.mean(residual_signal ** 2))
lp_coefficients[window] = filter_coefs
per_frame_gain[window] = gain
assign_range = wtws + np.arange(window_size, dtype="int32")
residual_excitation[assign_range] += residual_signal / gain
# Throw away first part in overlap mode for proper synthesis
residual_excitation = residual_excitation[int(pad_sizes[0]):]
return lp_coefficients, per_frame_gain, residual_excitation
def lpc_to_frequency(lp_coefficients, per_frame_gain):
"""
Extract resonant frequencies and magnitudes from LPC coefficients and gains.
Parameters
----------
lp_coefficients : ndarray
LPC coefficients, such as those calculated by ``lpc_analysis``
per_frame_gain : ndarray
Gain calculated for each frame, such as those calculated
by ``lpc_analysis``
Returns
-------
frequencies : ndarray
Resonant frequencies calculated from LPC coefficients and gain. Returned
frequencies are from 0 to 2 * pi
magnitudes : ndarray
Magnitudes of resonant frequencies
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
n_windows, order = lp_coefficients.shape
frame_frequencies = np.zeros((n_windows, (order - 1) // 2))
frame_magnitudes = np.zeros_like(frame_frequencies)
for window in range(n_windows):
w_coefs = lp_coefficients[window]
g_coefs = per_frame_gain[window]
roots = np.roots(np.hstack(([1], w_coefs[1:])))
# Roots doesn't return the same thing as MATLAB... agh
frequencies, index = np.unique(
np.abs(np.angle(roots)), return_index=True)
# Make sure 0 doesn't show up...
gtz = np.where(frequencies > 0)[0]
frequencies = frequencies[gtz]
index = index[gtz]
magnitudes = g_coefs / (1. - np.abs(roots))
sort_index = np.argsort(frequencies)
frame_frequencies[window, :len(sort_index)] = frequencies[sort_index]
frame_magnitudes[window, :len(sort_index)] = magnitudes[sort_index]
return frame_frequencies, frame_magnitudes
def lpc_to_lsf(all_lpc):
if len(all_lpc.shape) < 2:
all_lpc = all_lpc[None]
order = all_lpc.shape[1] - 1
all_lsf = np.zeros((len(all_lpc), order))
for i in range(len(all_lpc)):
lpc = all_lpc[i]
lpc1 = np.append(lpc, 0)
lpc2 = lpc1[::-1]
sum_filt = lpc1 + lpc2
diff_filt = lpc1 - lpc2
if order % 2 != 0:
deconv_diff, _ = sg.deconvolve(diff_filt, [1, 0, -1])
deconv_sum = sum_filt
else:
deconv_diff, _ = sg.deconvolve(diff_filt, [1, -1])
deconv_sum, _ = sg.deconvolve(sum_filt, [1, 1])
roots_diff = np.roots(deconv_diff)
roots_sum = np.roots(deconv_sum)
angle_diff = np.angle(roots_diff[::2])
angle_sum = np.angle(roots_sum[::2])
lsf = np.sort(np.hstack((angle_diff, angle_sum)))
if len(lsf) != 0:
all_lsf[i] = lsf
return np.squeeze(all_lsf)
def lsf_to_lpc(all_lsf):
if len(all_lsf.shape) < 2:
all_lsf = all_lsf[None]
order = all_lsf.shape[1]
all_lpc = np.zeros((len(all_lsf), order + 1))
for i in range(len(all_lsf)):
lsf = all_lsf[i]
zeros = np.exp(1j * lsf)
sum_zeros = zeros[::2]
diff_zeros = zeros[1::2]
sum_zeros = np.hstack((sum_zeros, np.conj(sum_zeros)))
diff_zeros = np.hstack((diff_zeros, np.conj(diff_zeros)))
sum_filt = np.poly(sum_zeros)
diff_filt = np.poly(diff_zeros)
if order % 2 != 0:
deconv_diff = sg.convolve(diff_filt, [1, 0, -1])
deconv_sum = sum_filt
else:
deconv_diff = sg.convolve(diff_filt, [1, -1])
deconv_sum = sg.convolve(sum_filt, [1, 1])
lpc = .5 * (deconv_sum + deconv_diff)
# Last coefficient is 0 and not returned
all_lpc[i] = lpc[:-1]
return np.squeeze(all_lpc)
def lpc_synthesis(lp_coefficients, per_frame_gain, residual_excitation=None,
voiced_frames=None, window_step=128, emphasis=0.9):
"""
Synthesize a signal from LPC coefficients
Based on code from:
http://labrosa.ee.columbia.edu/matlab/sws/
http://web.uvic.ca/~tyoon/resource/auditorytoolbox/auditorytoolbox/synlpc.html
Parameters
----------
lp_coefficients : ndarray
Linear prediction coefficients
per_frame_gain : ndarray
Gain coefficients
residual_excitation : ndarray or None, optional (default=None)
Residual excitations. If None, this will be synthesized with white noise
voiced_frames : ndarray or None, optional (default=None)
Voiced frames. If None, all frames assumed to be voiced.
window_step : int, optional (default=128)
The size (in samples) of the space between each window
emphasis : float, optional (default=0.9)
The emphasis coefficient to use for filtering
overlap_add : bool, optional (default=True)
What type of processing to use when joining windows
copy : bool, optional (default=True)
Whether to copy the input X or modify in place
Returns
-------
synthesized : ndarray
Sound vector synthesized from input arguments
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
# TODO: Incorporate better synthesis from
# http://eecs.oregonstate.edu/education/docs/ece352/CompleteManual.pdf
window_size = 2 * window_step
[n_windows, order] = lp_coefficients.shape
n_points = (n_windows + 1) * window_step
n_excitation_points = n_points + window_step + window_step // 2
random_state = np.random.RandomState(1999)
if residual_excitation is None:
# Need to generate excitation
if voiced_frames is None:
# No voiced/unvoiced info
voiced_frames = np.ones((lp_coefficients.shape[0], 1))
residual_excitation = np.zeros((n_excitation_points))
f, m = lpc_to_frequency(lp_coefficients, per_frame_gain)
t = np.linspace(0, 1, window_size, endpoint=False)
hanning = sg.hanning(window_size)
for window in range(n_windows):
window_base = window * window_step
index = window_base + np.arange(window_size)
if voiced_frames[window]:
sig = np.zeros_like(t)
cycles = np.cumsum(f[window][0] * t)
sig += sg.sawtooth(cycles, 0.001)
residual_excitation[index] += hanning * sig
residual_excitation[index] += hanning * 0.01 * random_state.randn(
window_size)
else:
n_excitation_points = residual_excitation.shape[0]
n_points = n_excitation_points + window_step + window_step // 2
residual_excitation = np.hstack((residual_excitation,
np.zeros(window_size)))
if voiced_frames is None:
voiced_frames = np.ones_like(per_frame_gain)
synthesized = np.zeros((n_points))
for window in range(n_windows):
window_base = window * window_step
oldbit = synthesized[window_base + np.arange(window_step)]
w_coefs = lp_coefficients[window]
if not np.all(w_coefs):
# Hack to make lfilter avoid
# ValueError: BUG: filter coefficient a[0] == 0 not supported yet
# when all coeffs are 0
w_coefs = [1]
g_coefs = voiced_frames[window] * per_frame_gain[window]
index = window_base + np.arange(window_size)
newbit = g_coefs * sg.lfilter([1], w_coefs,
residual_excitation[index])
synthesized[index] = np.hstack((oldbit, np.zeros(
(window_size - window_step))))
synthesized[index] += sg.hanning(window_size) * newbit
synthesized = sg.lfilter([1], [1, -emphasis], synthesized)
return synthesized
def soundsc(X, gain_scale=.9, copy=True):
"""
Approximate implementation of soundsc from MATLAB without the audio playing.
Parameters
----------
X : ndarray
Signal to be rescaled
gain_scale : float
Gain multipler, default .9 (90% of maximum representation)
copy : bool, optional (default=True)
Whether to make a copy of input signal or operate in place.
Returns
-------
X_sc : ndarray
(-32767, 32767) scaled version of X as int16, suitable for writing
with scipy.io.wavfile
"""
X = np.array(X, copy=copy)
X = (X - X.min()) / (X.max() - X.min())
X = 2 * X - 1
X = gain_scale * X
X = X * 2 ** 15
return X.astype('int16')
def _wav2array(nchannels, sampwidth, data):
# wavio.py
# Author: Warren Weckesser
# License: BSD 3-Clause (http://opensource.org/licenses/BSD-3-Clause)
"""data must be the string containing the bytes from the wav file."""
num_samples, remainder = divmod(len(data), sampwidth * nchannels)
if remainder > 0:
raise ValueError('The length of data is not a multiple of '
'sampwidth * num_channels.')
if sampwidth > 4:
raise ValueError("sampwidth must not be greater than 4.")
if sampwidth == 3:
a = np.empty((num_samples, nchannels, 4), dtype=np.uint8)
raw_bytes = np.fromstring(data, dtype=np.uint8)
a[:, :, :sampwidth] = raw_bytes.reshape(-1, nchannels, sampwidth)
a[:, :, sampwidth:] = (a[:, :, sampwidth - 1:sampwidth] >> 7) * 255
result = a.view('<i4').reshape(a.shape[:-1])
else:
# 8 bit samples are stored as unsigned ints; others as signed ints.
dt_char = 'u' if sampwidth == 1 else 'i'
a = np.fromstring(data, dtype='<%s%d' % (dt_char, sampwidth))
result = a.reshape(-1, nchannels)
return result
def readwav(file):
# wavio.py
# Author: Warren Weckesser
# License: BSD 3-Clause (http://opensource.org/licenses/BSD-3-Clause)
"""
Read a wav file.
Returns the frame rate, sample width (in bytes) and a numpy array
containing the data.
This function does not read compressed wav files.
"""
wav = wave.open(file)
rate = wav.getframerate()
nchannels = wav.getnchannels()
sampwidth = wav.getsampwidth()
nframes = wav.getnframes()
data = wav.readframes(nframes)
wav.close()
array = _wav2array(nchannels, sampwidth, data)
return rate, sampwidth, array
def csvd(arr):
"""
Do the complex SVD of a 2D array, returning real valued U, S, VT
http://stemblab.github.io/complex-svd/
"""
C_r = arr.real
C_i = arr.imag
block_x = C_r.shape[0]
block_y = C_r.shape[1]
K = np.zeros((2 * block_x, 2 * block_y))
# Upper left
K[:block_x, :block_y] = C_r
# Lower left
K[:block_x, block_y:] = C_i
# Upper right
K[block_x:, :block_y] = -C_i
# Lower right
K[block_x:, block_y:] = C_r
return svd(K, full_matrices=False)
def icsvd(U, S, VT):
"""
Invert back to complex values from the output of csvd
U, S, VT = csvd(X)
X_rec = inv_csvd(U, S, VT)
"""
K = U.dot(np.diag(S)).dot(VT)
block_x = U.shape[0] // 2
block_y = U.shape[1] // 2
arr_rec = np.zeros((block_x, block_y)) + 0j
arr_rec.real = K[:block_x, :block_y]
arr_rec.imag = K[:block_x, block_y:]
return arr_rec
def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True):
"""
Contruct a sinusoidal model for the input signal.
Parameters
----------
X : ndarray
Input signal to model
input_sample_rate : int
The sample rate of the input signal
resample_block : int, optional (default=128)
Controls the step size of the sinusoidal model
Returns
-------
frequencies_hz : ndarray
Frequencies for the sinusoids, in Hz.
magnitudes : ndarray
Magnitudes of sinusoids returned in ``frequencies``
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
X = np.array(X, copy=copy)
resample_to = 8000
if input_sample_rate != resample_to:
if input_sample_rate % resample_to != 0:
raise ValueError("Input sample rate must be a multiple of 8k!")
# Should be able to use resample... ?
# resampled_count = round(len(X) * resample_to / input_sample_rate)
# X = sg.resample(X, resampled_count, window=sg.hanning(len(X)))
X = sg.decimate(X, input_sample_rate // resample_to, zero_phase=True)
step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.)
a, g, e = lpc_analysis(X, order=8, window_step=step_size,
window_size=2 * step_size)
f, m = lpc_to_frequency(a, g)
f_hz = f * resample_to / (2 * np.pi)
return f_hz, m
def slinterp(X, factor, copy=True):
"""
Slow-ish linear interpolation of a 1D numpy array. There must be some
better function to do this in numpy.
Parameters
----------
X : ndarray
1D input array to interpolate
factor : int
Integer factor to interpolate by
Return
------
X_r : ndarray
"""
sz = np.product(X.shape)
X = np.array(X, copy=copy)
X_s = np.hstack((X[1:], [0]))
X_r = np.zeros((factor, sz))
for i in range(factor):
X_r[i, :] = (factor - i) / float(factor) * X + (i / float(factor)) * X_s
return X_r.T.ravel()[:(sz - 1) * factor + 1]
def sinusoid_synthesis(frequencies_hz, magnitudes, input_sample_rate=16000,
resample_block=128):
"""
Create a time series based on input frequencies and magnitudes.
Parameters
----------
frequencies_hz : ndarray
Input signal to model
magnitudes : int
The sample rate of the input signal
input_sample_rate : int, optional (default=16000)
The sample rate parameter that the sinusoid analysis was run with
resample_block : int, optional (default=128)
Controls the step size of the sinusoidal model
Returns
-------
synthesized : ndarray
Sound vector synthesized from input arguments
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
rows, cols = frequencies_hz.shape
synthesized = np.zeros((1 + ((rows - 1) * resample_block),))
for col in range(cols):
mags = slinterp(magnitudes[:, col], resample_block)
freqs = slinterp(frequencies_hz[:, col], resample_block)
cycles = np.cumsum(2 * np.pi * freqs / float(input_sample_rate))
sines = mags * np.cos(cycles)
synthesized += sines
return synthesized
def compress(X, n_components, window_size=128):
"""
Compress using the DCT
Parameters
----------
X : ndarray, shape=(n_samples,)
The input signal to compress. Should be 1-dimensional
n_components : int
The number of DCT components to keep. Setting n_components to about
.5 * window_size can give compression with fairly good reconstruction.
window_size : int
The input X is broken into windows of window_size, each of which are
then compressed with the DCT.
Returns
-------
X_compressed : ndarray, shape=(num_windows, window_size)
A 2D array of non-overlapping DCT coefficients. For use with uncompress
Reference
---------
http://nbviewer.ipython.org/github/craffel/crucialpython/blob/master/week3/stride_tricks.ipynb
"""
if len(X) % window_size != 0:
append = np.zeros((window_size - len(X) % window_size))
X = np.hstack((X, append))
num_frames = len(X) // window_size
X_strided = X.reshape((num_frames, window_size))
X_dct = fftpack.dct(X_strided, norm='ortho')
if n_components is not None:
X_dct = X_dct[:, :n_components]
return X_dct
def uncompress(X_compressed, window_size=128):
"""
Uncompress a DCT compressed signal (such as returned by ``compress``).
Parameters
----------
X_compressed : ndarray, shape=(n_samples, n_features)
Windowed and compressed array.
window_size : int, optional (default=128)
Size of the window used when ``compress`` was called.
Returns
-------
X_reconstructed : ndarray, shape=(n_samples)
Reconstructed version of X.
"""
if X_compressed.shape[1] % window_size != 0:
append = np.zeros((X_compressed.shape[0],
window_size - X_compressed.shape[1] % window_size))
X_compressed = np.hstack((X_compressed, append))
X_r = fftpack.idct(X_compressed, norm='ortho')
return X_r.ravel()
def sine_window(X):
"""
Apply a sinusoid window to X.
Parameters
----------
X : ndarray, shape=(n_samples, n_features)
Input array of samples
Returns
-------
X_windowed : ndarray, shape=(n_samples, n_features)
Windowed version of X.
"""
i = np.arange(X.shape[1])
win = np.sin(np.pi * (i + 0.5) / X.shape[1])
row_stride = 0
col_stride = win.itemsize
strided_win = as_strided(win, shape=X.shape,
strides=(row_stride, col_stride))
return X * strided_win
def kaiserbessel_window(X, alpha=6.5):
"""
Apply a Kaiser-Bessel window to X.
Parameters
----------
X : ndarray, shape=(n_samples, n_features)
Input array of samples
alpha : float, optional (default=6.5)
Tuning parameter for Kaiser-Bessel function. alpha=6.5 should make
perfect reconstruction possible for DCT.
Returns
-------
X_windowed : ndarray, shape=(n_samples, n_features)
Windowed version of X.
"""
beta = np.pi * alpha
win = sg.kaiser(X.shape[1], beta)
row_stride = 0
col_stride = win.itemsize
strided_win = as_strided(win, shape=X.shape,
strides=(row_stride, col_stride))
return X * strided_win
def overlap(X, window_size, window_step):
"""
Create an overlapped version of X
Parameters
----------
X : ndarray, shape=(n_samples,)
Input signal to window and overlap
window_size : int
Size of windows to take
window_step : int
Step size between windows
Returns
-------
X_strided : shape=(n_windows, window_size)
2D array of overlapped X
"""
if window_size % 2 != 0:
raise ValueError("Window size must be even!")
# Make sure there are an even number of windows before stridetricks
append = np.zeros((window_size - len(X) % window_size))
X = np.hstack((X, append))
overlap_sz = window_size - window_step
new_shape = X.shape[:-1] + ((X.shape[-1] - overlap_sz) // window_step, window_size)
new_strides = X.strides[:-1] + (window_step * X.strides[-1],) + X.strides[-1:]
X_strided = as_strided(X, shape=new_shape, strides=new_strides)
return X_strided
def halfoverlap(X, window_size):
"""
Create an overlapped version of X using 50% of window_size as overlap.
Parameters
----------
X : ndarray, shape=(n_samples,)
Input signal to window and overlap
window_size : int
Size of windows to take
Returns
-------
X_strided : shape=(n_windows, window_size)
2D array of overlapped X
"""
if window_size % 2 != 0:
raise ValueError("Window size must be even!")
window_step = window_size // 2
# Make sure there are an even number of windows before stridetricks
append = np.zeros((window_size - len(X) % window_size))
X = np.hstack((X, append))
num_frames = len(X) // window_step - 1
row_stride = X.itemsize * window_step
col_stride = X.itemsize
X_strided = as_strided(X, shape=(num_frames, window_size),
strides=(row_stride, col_stride))
return X_strided
def invert_halfoverlap(X_strided):
"""
Invert ``halfoverlap`` function to reconstruct X
Parameters
----------
X_strided : ndarray, shape=(n_windows, window_size)
X as overlapped windows
Returns
-------
X : ndarray, shape=(n_samples,)
Reconstructed version of X
"""
# Hardcoded 50% overlap! Can generalize later...
n_rows, n_cols = X_strided.shape
X = np.zeros((((int(n_rows // 2) + 1) * n_cols),)).astype(X_strided.dtype)
start_index = 0
end_index = n_cols
window_step = n_cols // 2
for row in range(X_strided.shape[0]):
X[start_index:end_index] += X_strided[row]
start_index += window_step
end_index += window_step
return X
def overlap_add(X_strided, window_step, wsola=False):
"""
overlap add to reconstruct X
Parameters
----------
X_strided : ndarray, shape=(n_windows, window_size)
X as overlapped windows
window_step : int
step size for overlap add
Returns
-------
X : ndarray, shape=(n_samples,)
Reconstructed version of X
"""
n_rows, window_size = X_strided.shape
# Start with largest size (no overlap) then truncate after we finish
# +2 for one window on each side
X = np.zeros(((n_rows + 2) * window_size,)).astype(X_strided.dtype)
start_index = 0
total_windowing_sum = np.zeros((X.shape[0]))
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(window_size) / (
window_size - 1))
for i in range(n_rows):
end_index = start_index + window_size
if wsola:
offset_size = window_size - window_step
offset = xcorr_offset(X[start_index:start_index + offset_size],
X_strided[i, :offset_size])
ss = start_index - offset
st = end_index - offset
if start_index - offset < 0:
ss = 0
st = 0 + (end_index - start_index)
X[ss:st] += X_strided[i]
total_windowing_sum[ss:st] += win
start_index = start_index + window_step
else:
X[start_index:end_index] += X_strided[i]
total_windowing_sum[start_index:end_index] += win
start_index += window_step
# Not using this right now
#X = np.real(X) / (total_windowing_sum + 1)
X = X[:end_index]
return X
def overlap_compress(X, n_components, window_size):
"""
Overlap (at 50% of window_size) and compress X.
Parameters
----------
X : ndarray, shape=(n_samples,)
Input signal to compress
n_components : int
number of DCT components to keep
window_size : int
Size of windows to take
Returns
-------
X_dct : ndarray, shape=(n_windows, n_components)
Windowed and compressed version of X
"""
X_strided = halfoverlap(X, window_size)
X_dct = fftpack.dct(X_strided, norm='ortho')
if n_components is not None:
X_dct = X_dct[:, :n_components]
return X_dct
# Evil voice is caused by adding double the zeros before inverse DCT...
# Very cool bug but makes sense
def overlap_uncompress(X_compressed, window_size):
"""
Uncompress X as returned from ``overlap_compress``.
Parameters
----------
X_compressed : ndarray, shape=(n_windows, n_components)
Windowed and compressed version of X
window_size : int
Size of windows originally used when compressing X
Returns
-------
X_reconstructed : ndarray, shape=(n_samples,)
Reconstructed version of X
"""
if X_compressed.shape[1] % window_size != 0:
append = np.zeros((X_compressed.shape[0], window_size -
X_compressed.shape[1] % window_size))
X_compressed = np.hstack((X_compressed, append))
X_r = fftpack.idct(X_compressed, norm='ortho')
return invert_halfoverlap(X_r)
def herz_to_mel(freqs):
"""
Based on code by Dan Ellis
http://labrosa.ee.columbia.edu/matlab/tf_agc/
"""
f_0 = 0 # 133.33333
f_sp = 200 / 3. # 66.66667
bark_freq = 1000.
bark_pt = (bark_freq - f_0) / f_sp
# The magic 1.0711703 which is the ratio needed to get from 1000 Hz
# to 6400 Hz in 27 steps, and is *almost* the ratio between 1000 Hz
# and the preceding linear filter center at 933.33333 Hz
# (actually 1000/933.33333 = 1.07142857142857 and
# exp(log(6.4)/27) = 1.07117028749447)
if not isinstance(freqs, np.ndarray):
freqs = np.array(freqs)[None]
log_step = np.exp(np.log(6.4) / 27)
lin_pts = (freqs < bark_freq)
mel = 0. * freqs
mel[lin_pts] = (freqs[lin_pts] - f_0) / f_sp
mel[~lin_pts] = bark_pt + np.log(freqs[~lin_pts] / bark_freq) / np.log(
log_step)
return mel
def mel_to_herz(mel):
"""
Based on code by Dan Ellis
http://labrosa.ee.columbia.edu/matlab/tf_agc/
"""
f_0 = 0 # 133.33333
f_sp = 200 / 3. # 66.66667
bark_freq = 1000.
bark_pt = (bark_freq - f_0) / f_sp
# The magic 1.0711703 which is the ratio needed to get from 1000 Hz
# to 6400 Hz in 27 steps, and is *almost* the ratio between 1000 Hz
# and the preceding linear filter center at 933.33333 Hz
# (actually 1000/933.33333 = 1.07142857142857 and
# exp(log(6.4)/27) = 1.07117028749447)
if not isinstance(mel, np.ndarray):
mel = np.array(mel)[None]
log_step = np.exp(np.log(6.4) / 27)
lin_pts = (mel < bark_pt)
freqs = 0. * mel
freqs[lin_pts] = f_0 + f_sp * mel[lin_pts]
freqs[~lin_pts] = bark_freq * np.exp(np.log(log_step) * (
mel[~lin_pts] - bark_pt))
return freqs
def mel_freq_weights(n_fft, fs, n_filts=None, width=None):
"""
Based on code by Dan Ellis
http://labrosa.ee.columbia.edu/matlab/tf_agc/
"""
min_freq = 0
max_freq = fs // 2
if width is None:
width = 1.
if n_filts is None:
n_filts = int(herz_to_mel(max_freq) / 2) + 1
else:
n_filts = int(n_filts)
assert n_filts > 0
weights = np.zeros((n_filts, n_fft))
fft_freqs = np.arange(n_fft // 2) / n_fft * fs
min_mel = herz_to_mel(min_freq)
max_mel = herz_to_mel(max_freq)
partial = np.arange(n_filts + 2) / (n_filts + 1.) * (max_mel - min_mel)
bin_freqs = mel_to_herz(min_mel + partial)
bin_bin = np.round(bin_freqs / fs * (n_fft - 1))
for i in range(n_filts):
fs_i = bin_freqs[i + np.arange(3)]
fs_i = fs_i[1] + width * (fs_i - fs_i[1])
lo_slope = (fft_freqs - fs_i[0]) / float(fs_i[1] - fs_i[0])
hi_slope = (fs_i[2] - fft_freqs) / float(fs_i[2] - fs_i[1])
weights[i, :n_fft // 2] = np.maximum(
0, np.minimum(lo_slope, hi_slope))
# Constant amplitude multiplier
weights = np.diag(2. / (bin_freqs[2:n_filts + 2]
- bin_freqs[:n_filts])).dot(weights)
weights[:, n_fft // 2:] = 0
return weights
def time_attack_agc(X, fs, t_scale=0.5, f_scale=1.):
"""
AGC based on code by Dan Ellis
http://labrosa.ee.columbia.edu/matlab/tf_agc/
"""
# 32 ms grid for FFT
n_fft = 2 ** int(np.log(0.032 * fs) / np.log(2))
f_scale = float(f_scale)
window_size = n_fft
window_step = window_size // 2
X_freq = stft(X, window_size, mean_normalize=False)
fft_fs = fs / window_step
n_bands = max(10, 20 / f_scale)
mel_width = f_scale * n_bands / 10.
f_to_a = mel_freq_weights(n_fft, fs, n_bands, mel_width)
f_to_a = f_to_a[:, :n_fft // 2 + 1]
audiogram = np.abs(X_freq).dot(f_to_a.T)
fbg = np.zeros_like(audiogram)
state = np.zeros((audiogram.shape[1],))
alpha = np.exp(-(1. / fft_fs) / t_scale)
for i in range(len(audiogram)):
state = np.maximum(alpha * state, audiogram[i])
fbg[i] = state
sf_to_a = np.sum(f_to_a, axis=0)
E = np.diag(1. / (sf_to_a + (sf_to_a == 0)))
E = E.dot(f_to_a.T)
E = fbg.dot(E.T)
E[E <= 0] = np.min(E[E > 0])
ts = istft(X_freq / E, window_size, mean_normalize=False)
return ts, X_freq, E
def hebbian_kmeans(X, n_clusters=10, n_epochs=10, W=None, learning_rate=0.01,
batch_size=100, random_state=None, verbose=True):
"""
Modified from existing code from R. Memisevic
See http://www.cs.toronto.edu/~rfm/code/hebbian_kmeans.py
"""
if W is None:
if random_state is None:
random_state = np.random.RandomState()
W = 0.1 * random_state.randn(n_clusters, X.shape[1])
else:
assert n_clusters == W.shape[0]
X2 = (X ** 2).sum(axis=1, keepdims=True)
last_print = 0
for e in range(n_epochs):
for i in range(0, X.shape[0], batch_size):
X_i = X[i: i + batch_size]
X2_i = X2[i: i + batch_size]
D = -2 * np.dot(W, X_i.T)
D += (W ** 2).sum(axis=1, keepdims=True)
D += X2_i.T
S = (D == D.min(axis=0)[None, :]).astype("float").T
W += learning_rate * (
np.dot(S.T, X_i) - S.sum(axis=0)[:, None] * W)
if verbose:
if e == 0 or e > (.05 * n_epochs + last_print):
last_print = e
print("Epoch %i of %i, cost %.4f" % (
e + 1, n_epochs, D.min(axis=0).sum()))
return W
def complex_to_real_view(arr_c):
# Inplace view from complex to r, i as separate columns
assert arr_c.dtype in [np.complex64, np.complex128]
shp = arr_c.shape
dtype = np.float64 if arr_c.dtype == np.complex128 else np.float32
arr_r = arr_c.ravel().view(dtype=dtype).reshape(shp[0], 2 * shp[1])
return arr_r
def real_to_complex_view(arr_r):
# Inplace view from real, image as columns to complex
assert arr_r.dtype not in [np.complex64, np.complex128]
shp = arr_r.shape
dtype = np.complex128 if arr_r.dtype == np.float64 else np.complex64
arr_c = arr_r.ravel().view(dtype=dtype).reshape(shp[0], shp[1] // 2)
return arr_c
def complex_to_abs(arr_c):
return np.abs(arr_c)
def complex_to_angle(arr_c):
return np.angle(arr_c)
def abs_and_angle_to_complex(arr_abs, arr_angle):
# abs(f_c2 - f_c) < 1E-15
return arr_abs * np.exp(1j * arr_angle)
def angle_to_sin_cos(arr_angle):
return np.hstack((np.sin(arr_angle), np.cos(arr_angle)))
def sin_cos_to_angle(arr_sin, arr_cos):
return np.arctan2(arr_sin, arr_cos)
def polyphase_core(x, m, f):
# x = input data
# m = decimation rate
# f = filter
# Hack job - append zeros to match decimation rate
if x.shape[0] % m != 0:
x = np.append(x, np.zeros((m - x.shape[0] % m,)))
if f.shape[0] % m != 0:
f = np.append(f, np.zeros((m - f.shape[0] % m,)))
polyphase = p = np.zeros((m, (x.shape[0] + f.shape[0]) / m), dtype=x.dtype)
p[0, :-1] = np.convolve(x[::m], f[::m])
# Invert the x values when applying filters
for i in range(1, m):
p[i, 1:] = np.convolve(x[m - i::m], f[i::m])
return p
def polyphase_single_filter(x, m, f):
return np.sum(polyphase_core(x, m, f), axis=0)
def polyphase_lowpass(arr, downsample=2, n_taps=50, filter_pad=1.1):
filt = firwin(downsample * n_taps, 1 / (downsample * filter_pad))
filtered = polyphase_single_filter(arr, downsample, filt)
return filtered
def window(arr, window_size, window_step=1, axis=0):
"""
Directly taken from Erik Rigtorp's post to numpy-discussion.
<http://www.mail-archive.com/numpy-discussion@scipy.org/msg29450.html>
<http://stackoverflow.com/questions/4936620/using-strides-for-an-efficient-moving-average-filter>
"""
if window_size < 1:
raise ValueError("`window_size` must be at least 1.")
if window_size > arr.shape[-1]:
raise ValueError("`window_size` is too long.")
orig = list(range(len(arr.shape)))
trans = list(range(len(arr.shape)))
trans[axis] = orig[-1]
trans[-1] = orig[axis]
arr = arr.transpose(trans)
shape = arr.shape[:-1] + (arr.shape[-1] - window_size + 1, window_size)
strides = arr.strides + (arr.strides[-1],)
strided = as_strided(arr, shape=shape, strides=strides)
if window_step > 1:
strided = strided[..., ::window_step, :]
orig = list(range(len(strided.shape)))
trans = list(range(len(strided.shape)))
trans[-2] = orig[-1]
trans[-1] = orig[-2]
trans = trans[::-1]
strided = strided.transpose(trans)
return strided
def unwindow(arr, window_size, window_step=1, axis=0):
# undo windows by broadcast
if axis != 0:
raise ValueError("axis != 0 currently unsupported")
shp = arr.shape
unwindowed = np.tile(arr[:, None, ...], (1, window_step, 1, 1))
unwindowed = unwindowed.reshape(shp[0] * window_step, *shp[1:])
return unwindowed.mean(axis=1)
def xcorr_offset(x1, x2):
"""
Under MSR-LA License
Based on MATLAB implementation from Spectrogram Inversion Toolbox
References
----------
D. Griffin and J. Lim. Signal estimation from modified
short-time Fourier transform. IEEE Trans. Acoust. Speech
Signal Process., 32(2):236-243, 1984.
Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory
Model Inversion for Sound Separation. Proc. IEEE-ICASSP,
Adelaide, 1994, II.77-80.
Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal
Estimation from Modified Short-Time Fourier Transform
Magnitude Spectra. IEEE Transactions on Audio Speech and
Language Processing, 08/2007.
"""
x1 = x1 - x1.mean()
x2 = x2 - x2.mean()
frame_size = len(x2)
half = frame_size // 2
corrs = np.convolve(x1.astype('float32'), x2[::-1].astype('float32'))
corrs[:half] = -1E30
corrs[-half:] = -1E30
offset = corrs.argmax() - len(x1)
return offset
def invert_spectrogram(X_s, step, calculate_offset=True, set_zero_phase=True):
"""
Under MSR-LA License
Based on MATLAB implementation from Spectrogram Inversion Toolbox
References
----------
D. Griffin and J. Lim. Signal estimation from modified
short-time Fourier transform. IEEE Trans. Acoust. Speech
Signal Process., 32(2):236-243, 1984.
Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory
Model Inversion for Sound Separation. Proc. IEEE-ICASSP,
Adelaide, 1994, II.77-80.
Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal
Estimation from Modified Short-Time Fourier Transform
Magnitude Spectra. IEEE Transactions on Audio Speech and
Language Processing, 08/2007.
"""
size = int(X_s.shape[1] // 2)
wave = np.zeros((X_s.shape[0] * step + size))
# Getting overflow warnings with 32 bit...
wave = wave.astype('float64')
total_windowing_sum = np.zeros((X_s.shape[0] * step + size))
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
est_start = int(size // 2) - 1
est_end = est_start + size
for i in range(X_s.shape[0]):
wave_start = int(step * i)
wave_end = wave_start + size
if set_zero_phase:
spectral_slice = X_s[i].real + 0j
else:
# already complex
spectral_slice = X_s[i]
# Don't need fftshift due to different impl.
wave_est = np.real(np.fft.ifft(spectral_slice))[::-1]
if calculate_offset and i > 0:
offset_size = size - step
if offset_size <= 0:
print("WARNING: Large step size >50\% detected! "
"This code works best with high overlap - try "
"with 75% or greater")
offset_size = step
offset = xcorr_offset(wave[wave_start:wave_start + offset_size],
wave_est[est_start:est_start + offset_size])
else:
offset = 0
wave[wave_start:wave_end] += win * wave_est[
est_start - offset:est_end - offset]
total_windowing_sum[wave_start:wave_end] += win
wave = np.real(wave) / (total_windowing_sum + 1E-6)
return wave
def iterate_invert_spectrogram(X_s, fftsize, step, n_iter=10, verbose=False,
complex_input=False):
"""
Under MSR-LA License
Based on MATLAB implementation from Spectrogram Inversion Toolbox
References
----------
D. Griffin and J. Lim. Signal estimation from modified
short-time Fourier transform. IEEE Trans. Acoust. Speech
Signal Process., 32(2):236-243, 1984.
Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory
Model Inversion for Sound Separation. Proc. IEEE-ICASSP,
Adelaide, 1994, II.77-80.
Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal
Estimation from Modified Short-Time Fourier Transform
Magnitude Spectra. IEEE Transactions on Audio Speech and
Language Processing, 08/2007.
"""
reg = np.max(X_s) / 1E8
X_best = copy.deepcopy(X_s)
try:
for i in range(n_iter):
if verbose:
print("Runnning iter %i" % i)
if i == 0 and not complex_input:
X_t = invert_spectrogram(X_best, step, calculate_offset=True,
set_zero_phase=True)
else:
# Calculate offset was False in the MATLAB version
# but in mine it massively improves the result
# Possible bug in my impl?
X_t = invert_spectrogram(X_best, step, calculate_offset=True,
set_zero_phase=False)
est = stft(X_t, fftsize=fftsize, step=step, compute_onesided=False)
phase = est / np.maximum(reg, np.abs(est))
phase = phase[:len(X_s)]
X_s = X_s[:len(phase)]
X_best = X_s * phase
except ValueError:
raise ValueError("The iterate_invert_spectrogram algorithm requires"
" stft(..., compute_onesided=False),",
" be sure you have calculated stft with this argument")
X_t = invert_spectrogram(X_best, step, calculate_offset=True,
set_zero_phase=False)
return np.real(X_t)
def harvest_get_downsampled_signal(x, fs, target_fs):
decimation_ratio = np.round(fs / target_fs)
offset = np.ceil(140. / decimation_ratio) * decimation_ratio
start_pad = x[0] * np.ones(int(offset), dtype=np.float32)
end_pad = x[-1] * np.ones(int(offset), dtype=np.float32)
x = np.concatenate((start_pad, x, end_pad), axis=0)
if fs < target_fs:
raise ValueError("CASE NOT HANDLED IN harvest_get_downsampled_signal")
else:
try:
y0 = sg.decimate(x, int(decimation_ratio), 3, zero_phase=True)
except:
y0 = sg.decimate(x, int(decimation_ratio), 3)
actual_fs = fs / decimation_ratio
y = y0[int(offset / decimation_ratio):-int(offset / decimation_ratio)]
y = y - np.mean(y)
return y, actual_fs
def harvest_get_raw_f0_candidates(number_of_frames, boundary_f0_list,
y_length, temporal_positions, actual_fs, y_spectrum, f0_floor,
f0_ceil):
raw_f0_candidates = np.zeros((len(boundary_f0_list), number_of_frames), dtype=np.float32)
for i in range(len(boundary_f0_list)):
raw_f0_candidates[i, :] = harvest_get_f0_candidate_from_raw_event(
boundary_f0_list[i], actual_fs, y_spectrum, y_length,
temporal_positions, f0_floor, f0_ceil)
return raw_f0_candidates
def harvest_nuttall(N):
t = np.arange(0, N) * 2 * np.pi / (N - 1)
coefs = np.array([0.355768, -0.487396, 0.144232, -0.012604])
window = np.cos(t[:, None].dot(np.array([0., 1., 2., 3.])[None])).dot( coefs[:, None])
# 1D window...
return window.ravel()
def harvest_get_f0_candidate_from_raw_event(boundary_f0,
fs, y_spectrum, y_length, temporal_positions, f0_floor,
f0_ceil):
filter_length_half = int(np.round(fs / boundary_f0 * 2))
band_pass_filter_base = harvest_nuttall(filter_length_half * 2 + 1)
shifter = np.cos(2 * np.pi * boundary_f0 * np.arange(-filter_length_half, filter_length_half + 1) / float(fs))
band_pass_filter = band_pass_filter_base * shifter
index_bias = filter_length_half
# possible numerical issues if 32 bit
spectrum_low_pass_filter = np.fft.fft(band_pass_filter.astype("float64"), len(y_spectrum))
filtered_signal = np.real(np.fft.ifft(spectrum_low_pass_filter * y_spectrum))
index_bias = filter_length_half + 1
filtered_signal = filtered_signal[index_bias + np.arange(y_length).astype("int32")]
negative_zero_cross = harvest_zero_crossing_engine(filtered_signal, fs)
positive_zero_cross = harvest_zero_crossing_engine(-filtered_signal, fs)
d_filtered_signal = filtered_signal[1:] - filtered_signal[:-1]
peak = harvest_zero_crossing_engine(d_filtered_signal, fs)
dip = harvest_zero_crossing_engine(-d_filtered_signal, fs)
f0_candidate = harvest_get_f0_candidate_contour(negative_zero_cross,
positive_zero_cross, peak, dip, temporal_positions)
f0_candidate[f0_candidate > (boundary_f0 * 1.1)] = 0.
f0_candidate[f0_candidate < (boundary_f0 * .9)] = 0.
f0_candidate[f0_candidate > f0_ceil] = 0.
f0_candidate[f0_candidate < f0_floor] = 0.
return f0_candidate
def harvest_get_f0_candidate_contour(negative_zero_cross_tup,
positive_zero_cross_tup, peak_tup, dip_tup, temporal_positions):
# 0 is inteval locations
# 1 is interval based f0
usable_channel = max(0, len(negative_zero_cross_tup[0]) - 2)
usable_channel *= max(0, len(positive_zero_cross_tup[0]) - 2)
usable_channel *= max(0, len(peak_tup[0]) - 2)
usable_channel *= max(0, len(dip_tup[0]) - 2)
if usable_channel > 0:
interpolated_f0_list = np.zeros((4, len(temporal_positions)))
nz = interp1d(negative_zero_cross_tup[0], negative_zero_cross_tup[1],
kind="linear", bounds_error=False, fill_value="extrapolate")
pz = interp1d(positive_zero_cross_tup[0], positive_zero_cross_tup[1],
kind="linear", bounds_error=False, fill_value="extrapolate")
pkz = interp1d(peak_tup[0], peak_tup[1],
kind="linear", bounds_error=False, fill_value="extrapolate")
dz = interp1d(dip_tup[0], dip_tup[1],
kind="linear", bounds_error=False, fill_value="extrapolate")
interpolated_f0_list[0, :] = nz(temporal_positions)
interpolated_f0_list[1, :] = pz(temporal_positions)
interpolated_f0_list[2, :] = pkz(temporal_positions)
interpolated_f0_list[3, :] = dz(temporal_positions)
f0_candidate = np.mean(interpolated_f0_list, axis=0)
else:
f0_candidate = temporal_positions * 0
return f0_candidate
def harvest_zero_crossing_engine(x, fs, debug=False):
# negative zero crossing, going from positive to negative
x_shift = x.copy()
x_shift[:-1] = x_shift[1:]
x_shift[-1] = x[-1]
# +1 here to avoid edge case at 0
points = np.arange(len(x)) + 1
negative_going_points = points * ((x_shift * x < 0) * (x_shift < x))
edge_list = negative_going_points[negative_going_points > 0]
# -1 to correct index
fine_edge_list = edge_list - x[edge_list - 1] / (x[edge_list] - x[edge_list - 1]).astype("float32")
interval_locations = (fine_edge_list[:-1] + fine_edge_list[1:]) / float(2) / fs
interval_based_f0 = float(fs) / (fine_edge_list[1:] - fine_edge_list[:-1])
return interval_locations, interval_based_f0
def harvest_detect_official_f0_candidates(raw_f0_candidates):
number_of_channels, number_of_frames = raw_f0_candidates.shape
f0_candidates = np.zeros((int(np.round(number_of_channels / 10.)), number_of_frames))
number_of_candidates = 0
threshold = 10
for i in range(number_of_frames):
tmp = raw_f0_candidates[:, i].copy()
tmp[tmp > 0] = 1.
tmp[0] = 0
tmp[-1] = 0
tmp = tmp[1:] - tmp[:-1]
st = np.where(tmp == 1)[0]
ed = np.where(tmp == -1)[0]
count = 0
for j in range(len(st)):
dif = ed[j] - st[j]
if dif >= threshold:
tmp_f0 = raw_f0_candidates[st[j] + 1: ed[j] + 1, i]
f0_candidates[count, i] = np.mean(tmp_f0)
count = count + 1
number_of_candidates = max(number_of_candidates, count)
return f0_candidates, number_of_candidates
def harvest_overlap_f0_candidates(f0_candidates, max_number_of_f0_candidates):
n = 3 # this is the optimized parameter... apparently
number_of_candidates = n * 2 + 1
new_f0_candidates = f0_candidates[number_of_candidates, :].copy()
new_f0_candidates = new_f0_candidates[None]
# hack to bypass magic matlab-isms of allocating when indexing OOB
new_f0_candidates = np.vstack([new_f0_candidates] + (new_f0_candidates.shape[-1] - 1) * [np.zeros_like(new_f0_candidates)])
# this indexing is megagross, possible source for bugs!
all_nonzero = []
for i in range(number_of_candidates):
st = max(-(i - n), 0)
ed = min(-(i - n), 0)
f1_b = np.arange(max_number_of_f0_candidates).astype("int32")
f1 = f1_b + int(i * max_number_of_f0_candidates)
all_nonzero = list(set(all_nonzero + list(f1)))
f2 = None if ed == 0 else ed
f3 = -ed
f4 = None if st == 0 else -st
new_f0_candidates[f1, st:f2] = f0_candidates[f1_b, f3:f4]
new_f0_candidates = new_f0_candidates[all_nonzero, :]
return new_f0_candidates
def harvest_refine_candidates(x, fs, temporal_positions, f0_candidates,
f0_floor, f0_ceil):
new_f0_candidates = f0_candidates.copy()
f0_scores = f0_candidates * 0.
for i in range(len(temporal_positions)):
for j in range(len(f0_candidates)):
tmp_f0 = f0_candidates[j, i]
if tmp_f0 == 0:
continue
res = harvest_get_refined_f0(x, fs, temporal_positions[i],
tmp_f0, f0_floor, f0_ceil)
new_f0_candidates[j, i] = res[0]
f0_scores[j, i] = res[1]
return new_f0_candidates, f0_scores
def harvest_get_refined_f0(x, fs, current_time, current_f0, f0_floor,
f0_ceil):
half_window_length = np.ceil(3. * fs / current_f0 / 2.)
window_length_in_time = (2. * half_window_length + 1) / float(fs)
base_time = np.arange(-half_window_length, half_window_length + 1) / float(fs)
fft_size = int(2 ** np.ceil(np.log2((half_window_length * 2 + 1)) + 1))
frequency_axis = np.arange(fft_size) / fft_size * float(fs)
base_index = np.round((current_time + base_time) * fs + 0.001)
index_time = (base_index - 1) / float(fs)
window_time = index_time - current_time
part1 = np.cos(2 * np.pi * window_time / window_length_in_time)
part2 = np.cos(4 * np.pi * window_time / window_length_in_time)
main_window = 0.42 + 0.5 * part1 + 0.08 * part2
ext = np.zeros((len(main_window) + 2))
ext[1:-1] = main_window
diff_window = -((ext[1:-1] - ext[:-2]) + (ext[2:] - ext[1:-1])) / float(2)
safe_index = np.maximum(1, np.minimum(len(x), base_index)).astype("int32") - 1
spectrum = np.fft.fft(x[safe_index] * main_window, fft_size)
diff_spectrum = np.fft.fft(x[safe_index] * diff_window, fft_size)
numerator_i = np.real(spectrum) * np.imag(diff_spectrum) - np.imag(spectrum) * np.real(diff_spectrum)
power_spectrum = np.abs(spectrum) ** 2
instantaneous_frequency = frequency_axis + numerator_i / power_spectrum * float(fs) / 2. / np.pi
number_of_harmonics = int(min(np.floor(float(fs) / 2. / current_f0), 6.))
harmonics_index = np.arange(number_of_harmonics) + 1
index_list = np.round(current_f0 * fft_size / fs * harmonics_index).astype("int32")
instantaneous_frequency_list = instantaneous_frequency[index_list]
amplitude_list = np.sqrt(power_spectrum[index_list])
refined_f0 = np.sum(amplitude_list * instantaneous_frequency_list)
refined_f0 /= np.sum(amplitude_list * harmonics_index.astype("float32"))
variation = np.abs(((instantaneous_frequency_list / harmonics_index.astype("float32")) - current_f0) / float(current_f0))
refined_score = 1. / (0.000000000001 + np.mean(variation))
if (refined_f0 < f0_floor) or (refined_f0 > f0_ceil) or (refined_score < 2.5):
refined_f0 = 0.
redined_score = 0.
return refined_f0, refined_score
def harvest_select_best_f0(reference_f0, f0_candidates, allowed_range):
best_f0 = 0
best_error = allowed_range
for i in range(len(f0_candidates)):
tmp = np.abs(reference_f0 - f0_candidates[i]) / reference_f0
if tmp > best_error:
continue
best_f0 = f0_candidates[i]
best_error = tmp
return best_f0, best_error
def harvest_remove_unreliable_candidates(f0_candidates, f0_scores):
new_f0_candidates = f0_candidates.copy()
new_f0_scores = f0_scores.copy()
threshold = 0.05
f0_length = f0_candidates.shape[1]
number_of_candidates = len(f0_candidates)
for i in range(1, f0_length - 1):
for j in range(number_of_candidates):
reference_f0 = f0_candidates[j, i]
if reference_f0 == 0:
continue
_, min_error1 = harvest_select_best_f0(reference_f0, f0_candidates[:, i + 1], 1)
_, min_error2 = harvest_select_best_f0(reference_f0, f0_candidates[:, i - 1], 1)
min_error = min([min_error1, min_error2])
if min_error > threshold:
new_f0_candidates[j, i] = 0
new_f0_scores[j, i] = 0
return new_f0_candidates, new_f0_scores
def harvest_search_f0_base(f0_candidates, f0_scores):
f0_base = f0_candidates[0, :] * 0.
for i in range(len(f0_base)):
max_index = np.argmax(f0_scores[:, i])
f0_base[i] = f0_candidates[max_index, i]
return f0_base
def harvest_fix_step_1(f0_base, allowed_range):
# Step 1: Rapid change of f0 contour is replaced by 0
f0_step1 = f0_base.copy()
f0_step1[0] = 0.
f0_step1[1] = 0.
for i in range(2, len(f0_base)):
if f0_base[i] == 0:
continue
reference_f0 = f0_base[i - 1] * 2 - f0_base[i - 2]
c1 = np.abs((f0_base[i] - reference_f0) / reference_f0) > allowed_range
c2 = np.abs((f0_base[i] - f0_base[i - 1]) / f0_base[i - 1]) > allowed_range
if c1 and c2:
f0_step1[i] = 0.
return f0_step1
def harvest_fix_step_2(f0_step1, voice_range_minimum):
f0_step2 = f0_step1.copy()
boundary_list = harvest_get_boundary_list(f0_step1)
for i in range(1, int(len(boundary_list) / 2.) + 1):
distance = boundary_list[(2 * i) - 1] - boundary_list[(2 * i) - 2]
if distance < voice_range_minimum:
# need one more due to range not including last index
lb = boundary_list[(2 * i) - 2]
ub = boundary_list[(2 * i) - 1] + 1
f0_step2[lb:ub] = 0.
return f0_step2
def harvest_fix_step_3(f0_step2, f0_candidates, allowed_range, f0_scores):
f0_step3 = f0_step2.copy()
boundary_list = harvest_get_boundary_list(f0_step2)
multichannel_f0 = harvest_get_multichannel_f0(f0_step2, boundary_list)
rrange = np.zeros((int(len(boundary_list) / 2), 2))
threshold1 = 100
threshold2 = 2200
count = 0
for i in range(1, int(len(boundary_list) / 2) + 1):
extended_f0, tmp_range_1 = harvest_extend_f0(multichannel_f0[i - 1, :],
boundary_list[(2 * i) - 1],
min([len(f0_step2) - 1, boundary_list[(2 * i) - 1] + threshold1]),
1, f0_candidates, allowed_range)
tmp_f0_sequence, tmp_range_0 = harvest_extend_f0(extended_f0,
boundary_list[(2 * i) - 2],
max([2, boundary_list[(2 * i) - 2] - threshold1]), -1,
f0_candidates, allowed_range)
mean_f0 = np.mean(tmp_f0_sequence[tmp_range_0 : tmp_range_1 + 1])
if threshold2 / mean_f0 < (tmp_range_1 - tmp_range_0):
multichannel_f0[count, :] = tmp_f0_sequence
rrange[count, :] = np.array([tmp_range_0, tmp_range_1])
count = count + 1
if count > 0:
multichannel_f0 = multichannel_f0[:count, :]
rrange = rrange[:count, :]
f0_step3 = harvest_merge_f0(multichannel_f0, rrange, f0_candidates,
f0_scores)
return f0_step3
def harvest_merge_f0(multichannel_f0, rrange, f0_candidates, f0_scores):
number_of_channels = len(multichannel_f0)
sorted_order = np.argsort(rrange[:, 0])
f0 = multichannel_f0[sorted_order[0], :]
for i in range(1, number_of_channels):
if rrange[sorted_order[i], 0] - rrange[sorted_order[0], 1] > 0:
# no overlapping
f0[rrange[sorted_order[i], 0]:rrange[sorted_order[i], 1]] = multichannel_f0[sorted_order[i], rrange[sorted_order[i], 0]:rrange[sorted_order[i], 1]]
cp = rrange.copy()
rrange[sorted_order[0], 0] = cp[sorted_order[i], 0]
rrange[sorted_order[0], 1] = cp[sorted_order[i], 1]
else:
cp = rrange.copy()
res = harvest_merge_f0_sub(f0, cp[sorted_order[0], 0],
cp[sorted_order[0], 1],
multichannel_f0[sorted_order[i], :],
cp[sorted_order[i], 0],
cp[sorted_order[i], 1], f0_candidates, f0_scores)
f0 = res[0]
rrange[sorted_order[0], 1] = res[1]
return f0
def harvest_merge_f0_sub(f0_1, st1, ed1, f0_2, st2, ed2, f0_candidates,
f0_scores):
merged_f0 = f0_1
if (st1 <= st2) and (ed1 >= ed2):
new_ed = ed1
return merged_f0, new_ed
new_ed = ed2
score1 = 0.
score2 = 0.
for i in range(int(st2), int(ed1) + 1):
score1 = score1 + harvest_serach_score(f0_1[i], f0_candidates[:, i], f0_scores[:, i])
score2 = score2 + harvest_serach_score(f0_2[i], f0_candidates[:, i], f0_scores[:, i])
if score1 > score2:
merged_f0[int(ed1):int(ed2) + 1] = f0_2[int(ed1):int(ed2) + 1]
else:
merged_f0[int(st2):int(ed2) + 1] = f0_2[int(st2):int(ed2) + 1]
return merged_f0, new_ed
def harvest_serach_score(f0, f0_candidates, f0_scores):
score = 0
for i in range(len(f0_candidates)):
if (f0 == f0_candidates[i]) and (score < f0_scores[i]):
score = f0_scores[i]
return score
def harvest_extend_f0(f0, origin, last_point, shift, f0_candidates,
allowed_range):
threshold = 4
extended_f0 = f0.copy()
tmp_f0 = extended_f0[origin]
shifted_origin = origin
count = 0
for i in np.arange(origin, last_point + shift, shift):
bf0, bs = harvest_select_best_f0(tmp_f0,
f0_candidates[:, i + shift], allowed_range)
extended_f0[i + shift] = bf0
if extended_f0[i + shift] != 0:
tmp_f0 = extended_f0[i + shift]
count = 0
shifted_origin = i + shift
else:
count = count + 1
if count == threshold:
break
return extended_f0, shifted_origin
def harvest_get_multichannel_f0(f0, boundary_list):
multichannel_f0 = np.zeros((int(len(boundary_list) / 2), len(f0)))
for i in range(1, int(len(boundary_list) / 2) + 1):
sl = boundary_list[(2 * i) - 2]
el = boundary_list[(2 * i) - 1] + 1
multichannel_f0[i - 1, sl:el] = f0[sl:el]
return multichannel_f0
def harvest_get_boundary_list(f0):
vuv = f0.copy()
vuv[vuv != 0] = 1.
vuv[0] = 0
vuv[-1] = 0
diff_vuv = vuv[1:] - vuv[:-1]
boundary_list = np.where(diff_vuv != 0)[0]
boundary_list[::2] = boundary_list[::2] + 1
return boundary_list
def harvest_fix_step_4(f0_step3, threshold):
f0_step4 = f0_step3.copy()
boundary_list = harvest_get_boundary_list(f0_step3)
for i in range(1, int(len(boundary_list) / 2.)):
distance = boundary_list[(2 * i)] - boundary_list[(2 * i) - 1] - 1
if distance >= threshold:
continue
boundary0 = f0_step3[boundary_list[(2 * i) - 1]] + 1
boundary1 = f0_step3[boundary_list[(2 * i)]] - 1
coefficient = (boundary1 - boundary0) / float((distance + 1))
count = 1
st = boundary_list[(2 * i) - 1] + 1
ed = boundary_list[(2 * i)]
for j in range(st, ed):
f0_step4[j] = boundary0 + coefficient * count
count = count + 1
return f0_step4
def harvest_fix_f0_contour(f0_candidates, f0_scores):
f0_base = harvest_search_f0_base(f0_candidates, f0_scores)
f0_step1 = harvest_fix_step_1(f0_base, 0.008) # optimized?
f0_step2 = harvest_fix_step_2(f0_step1, 6) # optimized?
f0_step3 = harvest_fix_step_3(f0_step2, f0_candidates, 0.18, f0_scores) # optimized?
f0 = harvest_fix_step_4(f0_step3, 9) # optimized
vuv = f0.copy()
vuv[vuv != 0] = 1.
return f0, vuv
def harvest_filter_f0_contour(f0, st, ed, b, a):
smoothed_f0 = f0.copy()
smoothed_f0[:st] = smoothed_f0[st]
smoothed_f0[ed + 1:] = smoothed_f0[ed]
aaa = sg.lfilter(b, a, smoothed_f0)
bbb = sg.lfilter(b, a, aaa[::-1])
smoothed_f0 = bbb[::-1].copy()
smoothed_f0[:st] = 0.
smoothed_f0[ed + 1:] = 0.
return smoothed_f0
def harvest_smooth_f0_contour(f0):
b = np.array([0.0078202080334971724, 0.015640416066994345, 0.0078202080334971724])
a = np.array([1.0, -1.7347257688092754, 0.76600660094326412])
smoothed_f0 = np.concatenate([np.zeros(300,), f0, np.zeros(300,)])
boundary_list = harvest_get_boundary_list(smoothed_f0)
multichannel_f0 = harvest_get_multichannel_f0(smoothed_f0, boundary_list)
for i in range(1, int(len(boundary_list) / 2) + 1):
tmp_f0_contour = harvest_filter_f0_contour(multichannel_f0[i - 1, :],
boundary_list[(2 * i) - 2], boundary_list[(2 * i) - 1], b, a)
st = boundary_list[(2 * i) - 2]
ed = boundary_list[(2 * i) - 1] + 1
smoothed_f0[st:ed] = tmp_f0_contour[st:ed]
smoothed_f0 = smoothed_f0[300:-300]
return smoothed_f0
def _world_get_temporal_positions(x_len, fs):
frame_period = 5
basic_frame_period = 1
basic_temporal_positions = np.arange(0, x_len / float(fs), basic_frame_period / float(1000))
temporal_positions = np.arange(0,
x_len / float(fs),
frame_period / float(1000))
return basic_temporal_positions, temporal_positions
def harvest(x, fs):
f0_floor = 71
f0_ceil = 800
target_fs = 8000
channels_in_octave = 40.
basic_temporal_positions, temporal_positions = _world_get_temporal_positions(len(x), fs)
adjusted_f0_floor = f0_floor * 0.9
adjusted_f0_ceil = f0_ceil * 1.1
boundary_f0_list = np.arange(1, np.ceil(np.log2(adjusted_f0_ceil / adjusted_f0_floor) * channels_in_octave) + 1) / float(channels_in_octave)
boundary_f0_list = adjusted_f0_floor * 2.0 ** boundary_f0_list
y, actual_fs = harvest_get_downsampled_signal(x, fs, target_fs)
fft_size = 2. ** np.ceil(np.log2(len(y) + np.round(fs / f0_floor * 4) + 1))
y_spectrum = np.fft.fft(y, int(fft_size))
raw_f0_candidates = harvest_get_raw_f0_candidates(
len(basic_temporal_positions),
boundary_f0_list, len(y), basic_temporal_positions, actual_fs,
y_spectrum, f0_floor, f0_ceil)
f0_candidates, number_of_candidates = harvest_detect_official_f0_candidates(raw_f0_candidates)
f0_candidates = harvest_overlap_f0_candidates(f0_candidates, number_of_candidates)
f0_candidates, f0_scores = harvest_refine_candidates(y, actual_fs,
basic_temporal_positions, f0_candidates, f0_floor, f0_ceil)
f0_candidates, f0_scores = harvest_remove_unreliable_candidates(f0_candidates, f0_scores)
connected_f0, vuv = harvest_fix_f0_contour(f0_candidates, f0_scores)
smoothed_f0 = harvest_smooth_f0_contour(connected_f0)
idx = np.minimum(len(smoothed_f0) - 1, np.round(temporal_positions * 1000)).astype("int32")
f0 = smoothed_f0[idx]
vuv = vuv[idx]
f0_candidates = f0_candidates
return temporal_positions, f0, vuv, f0_candidates
def cheaptrick_get_windowed_waveform(x, fs, current_f0, current_position):
half_window_length = np.round(1.5 * fs / float(current_f0))
base_index = np.arange(-half_window_length, half_window_length + 1)
index = np.round(current_position * fs + 0.001) + base_index + 1
safe_index = np.minimum(len(x), np.maximum(1, np.round(index))).astype("int32")
safe_index = safe_index - 1
segment = x[safe_index]
time_axis = base_index / float(fs) / 1.5
window1 = 0.5 * np.cos(np.pi * time_axis * float(current_f0)) + 0.5
window1 = window1 / np.sqrt(np.sum(window1 ** 2))
waveform = segment * window1 - window1 * np.mean(segment * window1) / np.mean(window1)
return waveform
def cheaptrick_get_power_spectrum(waveform, fs, fft_size, f0):
power_spectrum = np.abs(np.fft.fft(waveform, fft_size)) ** 2
frequency_axis = np.arange(fft_size) / float(fft_size) * float(fs)
ind = frequency_axis < (f0 + fs / fft_size)
low_frequency_axis = frequency_axis[ind]
low_frequency_replica = interp1d(f0 - low_frequency_axis,
power_spectrum[ind], kind="linear",
fill_value="extrapolate")(low_frequency_axis)
p1 = low_frequency_replica[(frequency_axis < f0)[:len(low_frequency_replica)]]
p2 = power_spectrum[(frequency_axis < f0)[:len(power_spectrum)]]
power_spectrum[frequency_axis < f0] = p1 + p2
lb1 = int(fft_size / 2) + 1
lb2 = 1
ub2 = int(fft_size / 2)
power_spectrum[lb1:] = power_spectrum[lb2:ub2][::-1]
return power_spectrum
def cheaptrick_linear_smoothing(power_spectrum, f0, fs, fft_size):
double_frequency_axis = np.arange(2 * fft_size) / float(fft_size ) * fs - fs
double_spectrum = np.concatenate([power_spectrum, power_spectrum])
double_segment = np.cumsum(double_spectrum * (fs / float(fft_size)))
center_frequency = np.arange(int(fft_size / 2) + 1) / float(fft_size ) * fs
low_levels = cheaptrick_interp1h(double_frequency_axis + fs / float(fft_size) / 2.,
double_segment, center_frequency - f0 / 3.)
high_levels = cheaptrick_interp1h(double_frequency_axis + fs / float(fft_size) / 2.,
double_segment, center_frequency + f0 / 3.)
smoothed_spectrum = (high_levels - low_levels) * 1.5 / f0
return smoothed_spectrum
def cheaptrick_interp1h(x, y, xi):
delta_x = float(x[1] - x[0])
xi = np.maximum(x[0], np.minimum(x[-1], xi))
xi_base = (np.floor((xi - x[0]) / delta_x)).astype("int32")
xi_fraction = (xi - x[0]) / delta_x - xi_base
delta_y = np.zeros_like(y)
delta_y[:-1] = y[1:] - y[:-1]
yi = y[xi_base] + delta_y[xi_base] * xi_fraction
return yi
def cheaptrick_smoothing_with_recovery(smoothed_spectrum, f0, fs, fft_size, q1):
quefrency_axis = np.arange(fft_size) / float(fs)
# 0 is NaN
smoothing_lifter = np.sin(np.pi * f0 * quefrency_axis) / (np.pi * f0 * quefrency_axis)
p = smoothing_lifter[1:int(fft_size / 2)][::-1].copy()
smoothing_lifter[int(fft_size / 2) + 1:] = p
smoothing_lifter[0] = 1.
compensation_lifter = (1 - 2. * q1) + 2. * q1 * np.cos(2 * np.pi * quefrency_axis * f0)
p = compensation_lifter[1:int(fft_size / 2)][::-1].copy()
compensation_lifter[int(fft_size / 2) + 1:] = p
tandem_cepstrum = np.fft.fft(np.log(smoothed_spectrum))
tmp_spectral_envelope = np.exp(np.real(np.fft.ifft(tandem_cepstrum * smoothing_lifter * compensation_lifter)))
spectral_envelope = tmp_spectral_envelope[:int(fft_size / 2) + 1]
return spectral_envelope
def cheaptrick_estimate_one_slice(x, fs, current_f0,
current_position, fft_size, q1):
waveform = cheaptrick_get_windowed_waveform(x, fs, current_f0,
current_position)
power_spectrum = cheaptrick_get_power_spectrum(waveform, fs, fft_size,
current_f0)
smoothed_spectrum = cheaptrick_linear_smoothing(power_spectrum, current_f0,
fs, fft_size)
comb_spectrum = np.concatenate([smoothed_spectrum, smoothed_spectrum[1:-1][::-1]])
spectral_envelope = cheaptrick_smoothing_with_recovery(comb_spectrum,
current_f0, fs, fft_size, q1)
return spectral_envelope
def cheaptrick(x, fs, temporal_positions, f0_sequence,
vuv, fftlen="auto", q1=-0.15):
f0_sequence = f0_sequence.copy()
f0_low_limit = 71
default_f0 = 500
if fftlen == "auto":
fftlen = int(2 ** np.ceil(np.log2(3. * float(fs) / f0_low_limit + 1)))
#raise ValueError("Only fftlen auto currently supported")
fft_size = fftlen
f0_low_limit = fs * 3.0 / (fft_size - 3.0)
f0_sequence[vuv == 0] = default_f0
spectrogram = np.zeros((int(fft_size / 2.) + 1, len(f0_sequence)))
for i in range(len(f0_sequence)):
if f0_sequence[i] < f0_low_limit:
f0_sequence[i] = default_d0
spectrogram[:, i] = cheaptrick_estimate_one_slice(x, fs, f0_sequence[i],
temporal_positions[i], fft_size, q1)
return temporal_positions, spectrogram.T, fs
def d4c_love_train(x, fs, current_f0, current_position, threshold):
vuv = 0
if current_f0 == 0:
return vuv
lowest_f0 = 40
current_f0 = max([current_f0, lowest_f0])
fft_size = int(2 ** np.ceil(np.log2(3. * fs / lowest_f0 + 1)))
boundary0 = int(np.ceil(100 / (float(fs) / fft_size)))
boundary1 = int(np.ceil(4000 / (float(fs) / fft_size)))
boundary2 = int(np.ceil(7900 / (float(fs) / fft_size)))
waveform = d4c_get_windowed_waveform(x, fs, current_f0, current_position,
1.5, 2)
power_spectrum = np.abs(np.fft.fft(waveform, int(fft_size)) ** 2)
power_spectrum[0:boundary0 + 1] = 0.
cumulative_spectrum = np.cumsum(power_spectrum)
if (cumulative_spectrum[boundary1] / cumulative_spectrum[boundary2]) > threshold:
vuv = 1
return vuv
def d4c_get_windowed_waveform(x, fs, current_f0, current_position, half_length,
window_type):
half_window_length = int(np.round(half_length * fs / current_f0))
base_index = np.arange(-half_window_length, half_window_length + 1)
index = np.round(current_position * fs + 0.001) + base_index + 1
safe_index = np.minimum(len(x), np.maximum(1, np.round(index))).astype("int32") - 1
segment = x[safe_index]
time_axis = base_index / float(fs) / float(half_length)
if window_type == 1:
window1 = 0.5 * np.cos(np.pi * time_axis * current_f0) + 0.5
elif window_type == 2:
window1 = 0.08 * np.cos(np.pi * time_axis * current_f0 * 2)
window1 += 0.5 * np.cos(np.pi * time_axis * current_f0) + 0.42
else:
raise ValueError("Unknown window type")
waveform = segment * window1 - window1 * np.mean(segment * window1) / np.mean(window1)
return waveform
def d4c_get_static_centroid(x, fs, current_f0, current_position, fft_size):
waveform1 = d4c_get_windowed_waveform(x, fs, current_f0,
current_position + 1. / current_f0 / 4., 2, 2)
waveform2 = d4c_get_windowed_waveform(x, fs, current_f0,
current_position - 1. / current_f0 / 4., 2, 2)
centroid1 = d4c_get_centroid(waveform1, fft_size)
centroid2 = d4c_get_centroid(waveform2, fft_size)
centroid = d4c_dc_correction(centroid1 + centroid2, fs, fft_size,
current_f0)
return centroid
def d4c_get_centroid(x, fft_size):
fft_size = int(fft_size)
time_axis = np.arange(1, len(x) + 1)
x = x.copy()
x = x / np.sqrt(np.sum(x ** 2))
spectrum = np.fft.fft(x, fft_size)
weighted_spectrum = np.fft.fft(-x * 1j * time_axis, fft_size)
centroid = -(weighted_spectrum.imag) * spectrum.real + spectrum.imag * weighted_spectrum.real
return centroid
def d4c_dc_correction(signal, fs, fft_size, f0):
fft_size = int(fft_size)
frequency_axis = np.arange(fft_size) / fft_size * fs
low_frequency_axis = frequency_axis[frequency_axis < f0 + fs / fft_size]
low_frequency_replica = interp1d(f0 - low_frequency_axis,
signal[frequency_axis < f0 + fs / fft_size],
kind="linear",
fill_value="extrapolate")(low_frequency_axis)
idx = frequency_axis < f0
signal[idx] = low_frequency_replica[idx[:len(low_frequency_replica)]] + signal[idx]
signal[int(fft_size / 2.) + 1:] = signal[1 : int(fft_size / 2.)][::-1]
return signal
def d4c_linear_smoothing(group_delay, fs, fft_size, width):
double_frequency_axis = np.arange(2 * fft_size) / float(fft_size ) * fs - fs
double_spectrum = np.concatenate([group_delay, group_delay])
double_segment = np.cumsum(double_spectrum * (fs / float(fft_size)))
center_frequency = np.arange(int(fft_size / 2) + 1) / float(fft_size ) * fs
low_levels = cheaptrick_interp1h(double_frequency_axis + fs / float(fft_size) / 2.,
double_segment, center_frequency - width / 2.)
high_levels = cheaptrick_interp1h(double_frequency_axis + fs / float(fft_size) / 2.,
double_segment, center_frequency + width / 2.)
smoothed_spectrum = (high_levels - low_levels) / width
return smoothed_spectrum
def d4c_get_smoothed_power_spectrum(waveform, fs, f0, fft_size):
power_spectrum = np.abs(np.fft.fft(waveform, int(fft_size))) ** 2
spectral_envelope = d4c_dc_correction(power_spectrum, fs, fft_size, f0)
spectral_envelope = d4c_linear_smoothing(spectral_envelope, fs, fft_size, f0)
spectral_envelope = np.concatenate([spectral_envelope,
spectral_envelope[1:-1][::-1]])
return spectral_envelope
def d4c_get_static_group_delay(static_centroid, smoothed_power_spectrum, fs, f0,
fft_size):
group_delay = static_centroid / smoothed_power_spectrum
group_delay = d4c_linear_smoothing(group_delay, fs, fft_size, f0 / 2.)
group_delay = np.concatenate([group_delay, group_delay[1:-1][::-1]])
smoothed_group_delay = d4c_linear_smoothing(group_delay, fs, fft_size, f0)
group_delay = group_delay[:int(fft_size / 2) + 1] - smoothed_group_delay
group_delay = np.concatenate([group_delay, group_delay[1:-1][::-1]])
return group_delay
def d4c_get_coarse_aperiodicity(group_delay, fs, fft_size,
frequency_interval, number_of_aperiodicities, window1):
boundary = np.round(fft_size / len(window1) * 8)
half_window_length = np.floor(len(window1) / 2)
coarse_aperiodicity = np.zeros((number_of_aperiodicities, 1))
for i in range(1, number_of_aperiodicities + 1):
center = np.floor(frequency_interval * i / (fs / float(fft_size)))
segment = group_delay[int(center - half_window_length):int(center + half_window_length + 1)] * window1
power_spectrum = np.abs(np.fft.fft(segment, int(fft_size))) ** 2
cumulative_power_spectrum = np.cumsum(np.sort(power_spectrum[:int(fft_size / 2) + 1]))
coarse_aperiodicity[i - 1] = -10 * np.log10(cumulative_power_spectrum[int(fft_size / 2 - boundary) - 1] / cumulative_power_spectrum[-1])
return coarse_aperiodicity
def d4c_estimate_one_slice(x, fs, current_f0, frequency_interval,
current_position, fft_size, number_of_aperiodicities, window1):
if current_f0 == 0:
coarse_aperiodicity = np.zeros((number_of_aperiodicities, 1))
return coarse_aperiodicity
static_centroid = d4c_get_static_centroid(x, fs, current_f0,
current_position, fft_size)
waveform = d4c_get_windowed_waveform(x, fs, current_f0, current_position,
2, 1)
smoothed_power_spectrum = d4c_get_smoothed_power_spectrum(waveform, fs,
current_f0, fft_size)
static_group_delay = d4c_get_static_group_delay(static_centroid,
smoothed_power_spectrum, fs, current_f0, fft_size)
coarse_aperiodicity = d4c_get_coarse_aperiodicity(static_group_delay,
fs, fft_size, frequency_interval, number_of_aperiodicities, window1)
return coarse_aperiodicity
def d4c(x, fs, temporal_positions_h, f0_h, vuv_h, threshold="default",
fft_size="auto"):
f0_low_limit = 47
if fft_size == "auto":
fft_size = 2 ** np.ceil(np.log2(4. * fs / f0_low_limit + 1.))
else:
raise ValueError("Only fft_size auto currently supported")
f0_low_limit_for_spectrum = 71
fft_size_for_spectrum = 2 ** np.ceil(np.log2(3 * fs / f0_low_limit_for_spectrum + 1.))
threshold = 0.85
upper_limit = 15000
frequency_interval = 3000
f0 = f0_h.copy()
temporal_positions = temporal_positions_h.copy()
f0[vuv_h == 0] = 0.
number_of_aperiodicities = int(np.floor(np.min([upper_limit, fs / 2. - frequency_interval]) / float(frequency_interval)))
window_length = np.floor(frequency_interval / (fs / float(fft_size))) * 2 + 1
window1 = harvest_nuttall(window_length)
aperiodicity = np.zeros((int(fft_size_for_spectrum / 2) + 1, len(f0)))
coarse_ap = np.zeros((1, len(f0)))
frequency_axis = np.arange(int(fft_size_for_spectrum / 2) + 1) * float(fs) / fft_size_for_spectrum
coarse_axis = np.arange(number_of_aperiodicities + 2) * frequency_interval
coarse_axis[-1] = fs / 2.
for i in range(len(f0)):
r = d4c_love_train(x, fs, f0[i], temporal_positions_h[i], threshold)
if r == 0:
aperiodicity[:, i] = 1 - 0.000000000001
continue
current_f0 = max([f0_low_limit, f0[i]])
coarse_aperiodicity = d4c_estimate_one_slice(x, fs, current_f0,
frequency_interval, temporal_positions[i], fft_size,
number_of_aperiodicities, window1)
coarse_ap[0, i] = coarse_aperiodicity.ravel()[0]
coarse_aperiodicity = np.maximum(0, coarse_aperiodicity - (current_f0 - 100) * 2. / 100.)
piece = np.concatenate([[-60], -coarse_aperiodicity.ravel(), [-0.000000000001]])
part = interp1d(coarse_axis, piece, kind="linear")(frequency_axis) / 20.
aperiodicity[:, i] = 10 ** part
return temporal_positions_h, f0_h, vuv_h, aperiodicity.T, coarse_ap.squeeze()
def world_synthesis_time_base_generation(temporal_positions, f0, fs, vuv,
time_axis, default_f0):
f0_interpolated_raw = interp1d(temporal_positions, f0, kind="linear",
fill_value="extrapolate")(time_axis)
vuv_interpolated = interp1d(temporal_positions, vuv, kind="linear",
fill_value="extrapolate")(time_axis)
vuv_interpolated = vuv_interpolated > 0.5
f0_interpolated = f0_interpolated_raw * vuv_interpolated.astype("float32")
f0_interpolated[f0_interpolated == 0] = f0_interpolated[f0_interpolated == 0] + default_f0
total_phase = np.cumsum(2 * np.pi * f0_interpolated / float(fs))
core = np.mod(total_phase, 2 * np.pi)
core = np.abs(core[1:] - core[:-1])
# account for diff, avoid deprecation warning with [:-1]
pulse_locations = time_axis[:-1][core > (np.pi / 2.)]
pulse_locations_index = np.round(pulse_locations * fs).astype("int32")
return pulse_locations, pulse_locations_index, vuv_interpolated
def world_synthesis_get_spectral_parameters(temporal_positions,
temporal_position_index, spectrogram, amplitude_periodic,
amplitude_random, pulse_locations):
floor_index = int(np.floor(temporal_position_index) - 1)
assert floor_index >= 0
ceil_index = int(np.ceil(temporal_position_index) - 1)
t1 = temporal_positions[floor_index]
t2 = temporal_positions[ceil_index]
if t1 == t2:
spectrum_slice = spectrogram[:, floor_index]
periodic_slice = amplitude_periodic[:, floor_index]
aperiodic_slice = amplitude_random[:, floor_index]
else:
cs = np.concatenate([spectrogram[:, floor_index][None],
spectrogram[:, ceil_index][None]], axis=0)
mmm = max([t1, min([t2, pulse_locations])])
spectrum_slice = interp1d(np.array([t1, t2]), cs,
kind="linear", axis=0)(mmm.copy())
cp = np.concatenate([amplitude_periodic[:, floor_index][None],
amplitude_periodic[:, ceil_index][None]], axis=0)
periodic_slice = interp1d(np.array([t1, t2]), cp,
kind="linear", axis=0)(mmm.copy())
ca = np.concatenate([amplitude_random[:, floor_index][None],
amplitude_random[:, ceil_index][None]], axis=0)
aperiodic_slice = interp1d(np.array([t1, t2]), ca,
kind="linear", axis=0)(mmm.copy())
return spectrum_slice, periodic_slice, aperiodic_slice
"""
Filter data with an FIR filter using the overlap-add method.
from http://projects.scipy.org/scipy/attachment/ticket/837/fftfilt.py
"""
def nextpow2(x):
"""Return the first integer N such that 2**N >= abs(x)"""
return np.ceil(np.log2(np.abs(x)))
def fftfilt(b, x, *n):
"""Filter the signal x with the FIR filter described by the
coefficients in b using the overlap-add method. If the FFT
length n is not specified, it and the overlap-add block length
are selected so as to minimize the computational cost of
the filtering operation."""
N_x = len(x)
N_b = len(b)
# Determine the FFT length to use:
if len(n):
# Use the specified FFT length (rounded up to the nearest
# power of 2), provided that it is no less than the filter
# length:
n = n[0]
if n != int(n) or n <= 0:
raise ValueError('n must be a nonnegative integer')
if n < N_b:
n = N_b
N_fft = 2**nextpow2(n)
else:
if N_x > N_b:
# When the filter length is smaller than the signal,
# choose the FFT length and block size that minimize the
# FLOPS cost. Since the cost for a length-N FFT is
# (N/2)*log2(N) and the filtering operation of each block
# involves 2 FFT operations and N multiplications, the
# cost of the overlap-add method for 1 length-N block is
# N*(1+log2(N)). For the sake of efficiency, only FFT
# lengths that are powers of 2 are considered:
N = 2**np.arange(np.ceil(np.log2(N_b)),
np.floor(np.log2(N_x)))
cost = np.ceil(N_x/(N-N_b+1))*N*(np.log2(N)+1)
N_fft = N[np.argmin(cost)]
else:
# When the filter length is at least as long as the signal,
# filter the signal using a single block:
N_fft = 2**nextpow2(N_b+N_x-1)
N_fft = int(N_fft)
# Compute the block length:
L = int(N_fft - N_b + 1)
# Compute the transform of the filter:
H = np.fft.fft(b, N_fft)
y = np.zeros(N_x, dtype=np.float32)
i = 0
while i <= N_x:
il = min([i+L,N_x])
k = min([i+N_fft,N_x])
yt = np.fft.ifft(np.fft.fft(x[i:il],N_fft)*H,N_fft) # Overlap..
y[i:k] = y[i:k] + yt[:k-i] # and add
i += L
return y
def world_synthesis(f0_d4c, vuv_d4c, aperiodicity_d4c,
spectrogram_ct, fs_ct, random_seed=1999):
# swap 0 and 1 axis
spectrogram_ct = spectrogram_ct.T
fs = fs_ct
# coarse -> fine aper
if len(aperiodicity_d4c.shape) == 1 or aperiodicity_d4c.shape[1] == 1:
print("Coarse aperiodicity detected - interpolating to full size")
aper = np.zeros_like(spectrogram_ct)
if len(aperiodicity_d4c.shape) == 1:
aperiodicity_d4c = aperiodicity_d4c[None, :]
else:
aperiodicity_d4c = aperiodicity_d4c.T
coarse_aper_d4c = aperiodicity_d4c
frequency_interval = 3000
upper_limit = 15000
number_of_aperiodicities = int(np.floor(np.min([upper_limit, fs / 2. - frequency_interval]) / float(frequency_interval)))
coarse_axis = np.arange(number_of_aperiodicities + 2) * frequency_interval
coarse_axis[-1] = fs / 2.
f0_low_limit_for_spectrum = 71
fft_size_for_spectrum = 2 ** np.ceil(np.log2(3 * fs / f0_low_limit_for_spectrum + 1.))
frequency_axis = np.arange(int(fft_size_for_spectrum / 2) + 1) * float(fs) / fft_size_for_spectrum
for i in range(len(f0_d4c)):
ca = coarse_aper_d4c[0, i]
cf = f0_d4c[i]
coarse_aperiodicity = np.maximum(0, ca - (cf - 100) * 2. / 100.)
piece = np.concatenate([[-60], -ca.ravel(), [-0.000000000001]])
part = interp1d(coarse_axis, piece, kind="linear")(frequency_axis) / 20.
aper[:, i] = 10 ** part
aperiodicity_d4c = aper
else:
aperiodicity_d4c = aperiodicity_d4c.T
default_f0 = 500.
random_state = np.random.RandomState(1999)
spectrogram = spectrogram_ct
aperiodicity = aperiodicity_d4c
# max 30s, if greater than thrown an error
max_len = 5000000
_, temporal_positions = _world_get_temporal_positions(max_len, fs)
temporal_positions = temporal_positions[:spectrogram.shape[1]]
#temporal_positions = temporal_positions_d4c
#from IPython import embed; embed()
#raise ValueError()
vuv = vuv_d4c
f0 = f0_d4c
time_axis = np.arange(temporal_positions[0], temporal_positions[-1],
1. / fs)
y = 0. * time_axis
r = world_synthesis_time_base_generation(temporal_positions, f0, fs, vuv,
time_axis, default_f0)
pulse_locations, pulse_locations_index, interpolated_vuv = r
fft_size = int((len(spectrogram) - 1) * 2)
base_index = np.arange(-fft_size / 2, fft_size / 2) + 1
y_length = len(y)
tmp_complex_cepstrum = np.zeros((fft_size,), dtype=np.complex128)
latter_index = np.arange(int(fft_size / 2) + 1, fft_size + 1) - 1
temporal_position_index = interp1d(temporal_positions, np.arange(1, len(temporal_positions) + 1), kind="linear", fill_value="extrapolate")(pulse_locations)
temporal_postion_index = np.maximum(1, np.minimum(len(temporal_positions),
temporal_position_index)) - 1
amplitude_aperiodic = aperiodicity ** 2
amplitude_periodic = np.maximum(0.001, (1. - amplitude_aperiodic))
for i in range(len(pulse_locations_index)):
spectrum_slice, periodic_slice, aperiodic_slice = world_synthesis_get_spectral_parameters(
temporal_positions, temporal_position_index[i], spectrogram,
amplitude_periodic, amplitude_aperiodic, pulse_locations[i])
idx = min(len(pulse_locations_index), i + 2) - 1
noise_size = pulse_locations_index[idx] - pulse_locations_index[i]
output_buffer_index = np.maximum(1, np.minimum(y_length, pulse_locations_index[i] + 1 + base_index)).astype("int32") - 1
if interpolated_vuv[pulse_locations_index[i]] >= 0.5:
tmp_periodic_spectrum = spectrum_slice * periodic_slice
# eps in matlab/octave
tmp_periodic_spectrum[tmp_periodic_spectrum == 0] = 2.2204E-16
periodic_spectrum = np.concatenate([tmp_periodic_spectrum,
tmp_periodic_spectrum[1:-1][::-1]])
tmp_cepstrum = np.real(np.fft.fft(np.log(np.abs(periodic_spectrum)) / 2.))
tmp_complex_cepstrum[latter_index] = tmp_cepstrum[latter_index] * 2
tmp_complex_cepstrum[0] = tmp_cepstrum[0]
response = np.fft.fftshift(np.real(np.fft.ifft(np.exp(np.fft.ifft(
tmp_complex_cepstrum)))))
y[output_buffer_index] += response * np.sqrt(
max([1, noise_size]))
tmp_aperiodic_spectrum = spectrum_slice * aperiodic_slice
else:
tmp_aperiodic_spectrum = spectrum_slice
tmp_aperiodic_spectrum[tmp_aperiodic_spectrum == 0] = 2.2204E-16
aperiodic_spectrum = np.concatenate([tmp_aperiodic_spectrum,
tmp_aperiodic_spectrum[1:-1][::-1]])
tmp_cepstrum = np.real(np.fft.fft(np.log(np.abs(aperiodic_spectrum)) / 2.))
tmp_complex_cepstrum[latter_index] = tmp_cepstrum[latter_index] * 2
tmp_complex_cepstrum[0] = tmp_cepstrum[0]
rc = np.fft.ifft(tmp_complex_cepstrum)
erc = np.exp(rc)
response = np.fft.fftshift(np.real(np.fft.ifft(erc)))
noise_input = random_state.randn(max([3, noise_size]),)
y[output_buffer_index] = y[output_buffer_index] + fftfilt(noise_input - np.mean(noise_input), response)
return y
def _mgc_b2c(wc, c, alpha):
wc_o = np.zeros_like(wc)
desired_order = len(wc) - 1
for i in range(0, len(c))[::-1]:
prev = copy.copy(wc_o)
wc_o[0] = c[i]
if desired_order >= 1:
wc_o[1] = (1. - alpha ** 2) * prev[0] + alpha * prev[1]
for m in range(2, desired_order + 1):
wc_o[m] = prev[m - 1] + alpha * (prev[m] - wc_o[m - 1])
return wc_o
def _mgc_ptrans(p, m, alpha):
d = 0.
o = 0.
d = p[m]
for i in range(1, m)[::-1]:
o = p[i] + alpha * d
d = p[i]
p[i] = o
o = alpha * d
p[0] = (1. - alpha ** 2) * p[0] + 2 * o
def _mgc_qtrans(q, m, alpha):
d = q[1]
for i in range(2, 2 * m + 1):
o = q[i] + alpha * d
d = q[i]
q[i] = o
def _mgc_gain(er, c, m, g):
t = 0.
if g != 0:
for i in range(1, m + 1):
t += er[i] * c[i]
return er[0] + g * t
else:
return er[0]
def _mgc_fill_toeplitz(A, t):
n = len(t)
for i in range(n):
for j in range(n):
A[i, j] = t[i - j] if i - j >= 0 else t[j - i]
def _mgc_fill_hankel(A, t):
n = len(t) // 2 + 1
for i in range(n):
for j in range(n):
A[i, j] = t[i + j]
def _mgc_ignorm(c, gamma):
if gamma == 0.:
c[0] = np.log(c[0])
return c
gain = c[0] ** gamma
c[1:] *= gain
c[0] = (gain - 1.) / gamma
def _mgc_gnorm(c, gamma):
if gamma == 0.:
c[0] = np.exp(c[0])
return c
gain = 1. + gamma * c[0]
c[1:] /= gain
c[0] = gain ** (1. / gamma)
def _mgc_b2mc(mc, alpha):
m = len(mc)
o = 0.
d = mc[m - 1]
for i in range(m - 1)[::-1]:
o = mc[i] + alpha * d
d = mc[i]
mc[i] = o
def _mgc_mc2b(mc, alpha):
itr = range(len(mc) - 1)[::-1]
for i in itr:
mc[i] = mc[i] - alpha * mc[i + 1]
def _mgc_gc2gc(src_ceps, src_gamma=0., dst_order=None, dst_gamma=0.):
if dst_order == None:
dst_order = len(src_ceps) - 1
dst_ceps = np.zeros((dst_order + 1,), dtype=src_ceps.dtype)
dst_order = len(dst_ceps) - 1
m1 = len(src_ceps) - 1
dst_ceps[0] = copy.deepcopy(src_ceps[0])
for m in range(2, dst_order + 2):
ss1 = 0.
ss2 = 0.
min_1 = m1 if (m1 < m - 1) else m - 2
itr = range(2, min_1 + 2)
if len(itr) < 1:
if min_1 + 1 == 2:
itr = [2]
else:
itr = []
"""
# old slower version
for k in itr:
assert k >= 1
assert (m - k) >= 0
cc = src_ceps[k - 1] * dst_ceps[m - k]
ss2 += (k - 1) * cc
ss1 += (m - k) * cc
"""
if len(itr) > 0:
itr = np.array(itr)
cc_a = src_ceps[itr - 1] * dst_ceps[m - itr]
ss2 += ((itr - 1) * cc_a).sum()
ss1 += ((m - itr) * cc_a).sum()
if m <= m1 + 1: