-
-
Save attitudechunfeng/58a052f18f6aa24235000cc50618e887 to your computer and use it in GitHub Desktop.
nn.py is train phase and nn_synth.py is test phase
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from nnmnkwii.datasets import FileDataSource, FileSourceDataset | |
from nnmnkwii.datasets import PaddedFileSourceDataset, MemoryCacheDataset | |
from nnmnkwii.preprocessing import trim_zeros_frames, remove_zeros_frames | |
from nnmnkwii.preprocessing import minmax, meanvar, minmax_scale, scale | |
from nnmnkwii import paramgen | |
from nnmnkwii.io import hts | |
from nnmnkwii.frontend import merlin as fe | |
from nnmnkwii.postfilters import merlin_post_filter | |
from nnmnkwii.datasets import MemoryCacheFramewiseDataset | |
from os.path import join, expanduser, basename, splitext, basename, exists | |
import os | |
from glob import glob | |
import numpy as np | |
from scipy.io import wavfile | |
from sklearn.model_selection import train_test_split | |
import pyworld | |
import pysptk | |
from torch.utils import data as data_utils | |
import torch | |
from torch import nn | |
from torch.autograd import Variable | |
from tqdm import tnrange, tqdm | |
from torch import optim | |
import pickle as pkl | |
#----------------- | |
DATA_ROOT = "./data/slt_arctic_full_data" | |
test_size = 0.112 | |
random_state = 1234 | |
#----------------- | |
mgc_dim = 180 | |
lf0_dim = 3 | |
vuv_dim = 1 | |
bap_dim = 3 | |
duration_linguistic_dim = 416 | |
acoustic_linguisic_dim = 425 | |
duration_dim = 5 | |
acoustic_dim = mgc_dim + lf0_dim + vuv_dim + bap_dim | |
fs = 16000 | |
frame_period = 5 | |
hop_length = 80 | |
fftlen = 1024 | |
alpha = 0.41 | |
mgc_start_idx = 0 | |
lf0_start_idx = 180 | |
vuv_start_idx = 183 | |
bap_start_idx = 184 | |
windows = [ | |
(0, 0, np.array([1.0])), | |
(1, 1, np.array([-0.5, 0.0, 0.5])), | |
(1, 1, np.array([1.0, -2.0, 1.0])), | |
] | |
#----------------- | |
class BinaryFileSource(FileDataSource): | |
def __init__(self, data_root, dim, train): | |
self.data_root = data_root | |
self.dim = dim | |
self.train = train | |
def collect_files(self): | |
files = sorted(glob(join(self.data_root, "*.bin"))) | |
files = files[:len(files)-5] # last 5 is real testset | |
train_files, test_files = train_test_split(files, test_size=test_size, | |
random_state=random_state) | |
if self.train: | |
return train_files | |
else: | |
return test_files | |
def collect_features(self, path): | |
return np.fromfile(path, dtype=np.float32).reshape(-1, self.dim) | |
#----------------- | |
X = {"duration":{}, "acoustic": {}} | |
Y = {"duration":{}, "acoustic": {}} | |
utt_lengths = {"duration":{}, "acoustic": {}} | |
for ty in ["duration", "acoustic"]: | |
for phase in ["train", "test"]: | |
train = phase == "train" | |
x_dim = duration_linguistic_dim if ty == "duration" else acoustic_linguisic_dim | |
y_dim = duration_dim if ty == "duration" else acoustic_dim | |
X[ty][phase] = FileSourceDataset(BinaryFileSource(join(DATA_ROOT, "X_{}".format(ty)), | |
dim=x_dim, | |
train=train)) | |
Y[ty][phase] = FileSourceDataset(BinaryFileSource(join(DATA_ROOT, "Y_{}".format(ty)), | |
dim=y_dim, | |
train=train)) | |
utt_lengths[ty][phase] = np.array([len(x) for x in X[ty][phase]], dtype=np.int) | |
#----------------- | |
X_min = {} | |
X_max = {} | |
Y_mean = {} | |
Y_var = {} | |
Y_scale = {} | |
for typ in ["acoustic", "duration"]: | |
X_min[typ], X_max[typ] = minmax(X[typ]["train"], utt_lengths[typ]["train"]) | |
Y_mean[typ], Y_var[typ] = meanvar(Y[typ]["train"], utt_lengths[typ]["train"]) | |
Y_scale[typ] = np.sqrt(Y_var[typ]) | |
norm_params = [X_min, X_max, Y_mean, Y_var, Y_scale] | |
with open('norm_params.pkl', 'wb') as fout: | |
pkl.dump(norm_params, fout) | |
#----------------- | |
class PyTorchDataset(torch.utils.data.Dataset): | |
"""Thin dataset wrapper for pytorch | |
This does just two things: | |
1. On-demand normalization | |
2. Returns torch.tensor instead of ndarray | |
""" | |
def __init__(self, X, Y, X_min, X_max, Y_mean, Y_scale): | |
self.X = X | |
self.Y = Y | |
self.X_min = X_min | |
self.X_max = X_max | |
self.Y_mean = Y_mean | |
self.Y_scale = Y_scale | |
def __getitem__(self, idx): | |
x, y = self.X[idx], self.Y[idx] | |
x = minmax_scale(x, self.X_min, self.X_max, feature_range=(0.01, 0.99)) | |
y = scale(y, self.Y_mean, self.Y_scale) | |
x, y = torch.from_numpy(x), torch.from_numpy(y) | |
return x, y | |
def __len__(self): | |
return len(self.X) | |
#----------------- | |
class MyNet(torch.nn.Module): | |
"""Very simple deep neural networks. | |
""" | |
def __init__(self, D_in, H, D_out, num_layers=2): | |
super(MyNet, self).__init__() | |
self.first_linear = nn.Linear(D_in, H) | |
self.hidden_layers = nn.ModuleList( | |
[nn.Linear(H, H) for _ in range(num_layers)]) | |
self.last_linear = nn.Linear(H, D_out) | |
self.relu = nn.Tanh() | |
def forward(self, x): | |
h = self.relu(self.first_linear(x)) | |
for hl in self.hidden_layers: | |
h = self.relu(hl(h)) | |
return self.last_linear(h) | |
#----------------- | |
num_hidden_layers = 3 | |
hidden_size = 256 | |
batch_size = 256 | |
# We use PyTorch's multiprocess iterator. Note that large n_workers causes | |
# dataset copies across proccess. | |
n_workers = 4 | |
pin_memory = True | |
nepoch = 25 | |
lr = 0.001 | |
weight_decay = 1e-6 | |
use_cuda = torch.cuda.is_available() | |
#----------------- | |
def train(model, optimizer, X, Y, X_min, X_max, Y_mean, Y_scale, | |
utt_lengths, cache_size=1000): | |
if use_cuda: | |
model = model.cuda() | |
X_train, X_test = X["train"], X["test"] | |
Y_train, Y_test = Y["train"], Y["test"] | |
train_lengths, test_lengths = utt_lengths["train"], utt_lengths["test"] | |
# Frame-wise train data loader | |
X_train_cache_dataset = MemoryCacheFramewiseDataset( | |
X_train, train_lengths, cache_size) | |
Y_train_cache_dataset = MemoryCacheFramewiseDataset( | |
Y_train, train_lengths, cache_size) | |
train_dataset = PyTorchDataset(X_train_cache_dataset, Y_train_cache_dataset, | |
X_min, X_max, Y_mean, Y_scale) | |
train_loader = data_utils.DataLoader( | |
train_dataset, batch_size=batch_size, num_workers=n_workers, | |
pin_memory=pin_memory, shuffle=True) | |
print("Train dataset number of frames", len(train_dataset)) | |
# Frame-wise test data loader | |
X_test_cache_dataset = MemoryCacheFramewiseDataset( | |
X_test, test_lengths, cache_size) | |
Y_test_cache_dataset = MemoryCacheFramewiseDataset( | |
Y_test, test_lengths, cache_size) | |
test_dataset = PyTorchDataset(X_test_cache_dataset, Y_test_cache_dataset, | |
X_min, X_max, Y_mean, Y_scale) | |
test_loader = data_utils.DataLoader( | |
test_dataset, batch_size=batch_size, num_workers=n_workers, | |
pin_memory=pin_memory, shuffle=False) | |
print("Test dataset numer of frames", len(test_dataset)) | |
dataset_loaders = {"train": train_loader, "test": test_loader} | |
# Training loop | |
criterion = nn.MSELoss() | |
model.train() | |
print("Start frame-wise training...") | |
loss_history = {"train": [], "test": []} | |
for epoch in tnrange(nepoch): | |
for phase in ["train", "test"]: | |
if phase == "train": | |
model.train() | |
else: | |
model.eval() | |
running_loss = 0.0 | |
for x, y in dataset_loaders[phase]: | |
if use_cuda: | |
x, y = x.cuda(), y.cuda() | |
x, y = Variable(x), Variable(y) | |
optimizer.zero_grad() | |
y_hat = model(x) | |
loss = criterion(y_hat, y) | |
if phase == "train": | |
loss.backward() | |
optimizer.step() | |
running_loss += loss.data[0] | |
loss_history[phase].append(running_loss / len(dataset_loaders[phase])) | |
return loss_history | |
#----------------- | |
models = {} | |
for typ in ["duration", "acoustic"]: | |
models[typ] = MyNet(X[typ]["train"][0].shape[-1], | |
hidden_size, Y[typ]["train"][0].shape[-1], | |
num_hidden_layers) | |
print("Model for {}\n".format(typ), models[typ]) | |
#----------------- | |
ty = "duration" | |
optimizer = optim.Adam(models[ty].parameters(), lr=lr, weight_decay=weight_decay) | |
loss_history = train(models[ty], optimizer, X[ty], Y[ty], | |
X_min[ty], X_max[ty], Y_mean[ty], Y_scale[ty], utt_lengths[ty]) | |
#----------------- | |
ty = "acoustic" | |
optimizer = optim.Adam(models[ty].parameters(), lr=lr, weight_decay=weight_decay) | |
loss_history = train(models[ty], optimizer, X[ty], Y[ty], | |
X_min[ty], X_max[ty], Y_mean[ty], Y_scale[ty], utt_lengths[ty]) | |
#----------------- | |
torch.save(models, 'model.pkl') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from nnmnkwii.preprocessing import trim_zeros_frames, remove_zeros_frames | |
from nnmnkwii.preprocessing import minmax, meanvar, minmax_scale, scale | |
from nnmnkwii import paramgen | |
from nnmnkwii.io import hts | |
from nnmnkwii.postfilters import merlin_post_filter | |
from nnmnkwii.frontend import merlin as fe | |
import torch | |
from torch import nn | |
from torch.autograd import Variable | |
from os.path import join, expanduser, basename, splitext, basename, exists | |
import os | |
import numpy as np | |
import pyworld | |
import pysptk | |
import pickle as pkl | |
DATA_ROOT = "./data/slt_arctic_full_data" | |
#----------------- | |
mgc_dim = 180 | |
lf0_dim = 3 | |
vuv_dim = 1 | |
bap_dim = 3 | |
duration_linguistic_dim = 416 | |
acoustic_linguisic_dim = 425 | |
duration_dim = 5 | |
acoustic_dim = mgc_dim + lf0_dim + vuv_dim + bap_dim | |
fs = 16000 | |
frame_period = 5 | |
hop_length = 80 | |
fftlen = 1024 | |
alpha = 0.41 | |
mgc_start_idx = 0 | |
lf0_start_idx = 180 | |
vuv_start_idx = 183 | |
bap_start_idx = 184 | |
windows = [ | |
(0, 0, np.array([1.0])), | |
(1, 1, np.array([-0.5, 0.0, 0.5])), | |
(1, 1, np.array([1.0, -2.0, 1.0])), | |
] | |
#----------------- | |
norm_params = pkl.load(open('norm_params.pkl', 'rb')) | |
X_min = norm_params[0] | |
X_max = norm_params[1] | |
Y_mean = norm_params[2] | |
Y_var = norm_params[3] | |
Y_scale = norm_params[4] | |
binary_dict, continuous_dict = hts.load_question_set(join(DATA_ROOT, "questions-radio_dnn_416.hed")) | |
def gen_parameters(y_predicted): | |
# Number of time frames | |
T = y_predicted.shape[0] | |
# Split acoustic features | |
mgc = y_predicted[:,:lf0_start_idx] | |
lf0 = y_predicted[:,lf0_start_idx:vuv_start_idx] | |
vuv = y_predicted[:,vuv_start_idx] | |
bap = y_predicted[:,bap_start_idx:] | |
# Perform MLPG | |
ty = "acoustic" | |
mgc_variances = np.tile(Y_var[ty][:lf0_start_idx], (T, 1)) | |
mgc = paramgen.mlpg(mgc, mgc_variances, windows) | |
lf0_variances = np.tile(Y_var[ty][lf0_start_idx:vuv_start_idx], (T,1)) | |
lf0 = paramgen.mlpg(lf0, lf0_variances, windows) | |
bap_variances = np.tile(Y_var[ty][bap_start_idx:], (T, 1)) | |
bap = paramgen.mlpg(bap, bap_variances, windows) | |
return mgc, lf0, vuv, bap | |
def gen_waveform(y_predicted, do_postfilter=False): | |
y_predicted = trim_zeros_frames(y_predicted) | |
# Generate parameters and split streams | |
mgc, lf0, vuv, bap = gen_parameters(y_predicted) | |
if do_postfilter: | |
mgc = merlin_post_filter(mgc, alpha) | |
spectrogram = pysptk.mc2sp(mgc, fftlen=fftlen, alpha=alpha) | |
aperiodicity = pyworld.decode_aperiodicity(bap.astype(np.float64), fs, fftlen) | |
f0 = lf0.copy() | |
f0[vuv < 0.5] = 0 | |
f0[np.nonzero(f0)] = np.exp(f0[np.nonzero(f0)]) | |
generated_waveform = pyworld.synthesize(f0.flatten().astype(np.float64), | |
spectrogram.astype(np.float64), | |
aperiodicity.astype(np.float64), | |
fs, frame_period) | |
return generated_waveform | |
def gen_duration(label_path, duration_model): | |
# Linguistic features for duration | |
hts_labels = hts.load(label_path) | |
duration_linguistic_features = fe.linguistic_features(hts_labels, | |
binary_dict, continuous_dict, | |
add_frame_features=False, | |
subphone_features=None).astype(np.float32) | |
# Apply normalization | |
ty = "duration" | |
duration_linguistic_features = minmax_scale( | |
duration_linguistic_features, X_min[ty], X_max[ty], feature_range=(0.01, 0.99)) | |
# Apply model | |
duration_model = duration_model.cpu() | |
duration_model.eval() | |
x = Variable(torch.from_numpy(duration_linguistic_features)).float() | |
duration_predicted = duration_model(x).data.numpy() | |
# Apply denormalization | |
duration_predicted = duration_predicted * Y_scale[ty] + Y_mean[ty] | |
duration_predicted = np.round(duration_predicted) | |
# Set minimum state duration to 1 | |
duration_predicted[duration_predicted <= 0] = 1 | |
hts_labels.set_durations(duration_predicted) | |
return hts_labels | |
def test_one_utt(label_path, duration_model, acoustic_model, post_filter=True): | |
# Predict durations | |
duration_modified_hts_labels = gen_duration(label_path, duration_model) | |
# Linguistic features | |
linguistic_features = fe.linguistic_features(duration_modified_hts_labels, | |
binary_dict, continuous_dict, | |
add_frame_features=True, | |
subphone_features="full") | |
# Trim silences | |
indices = duration_modified_hts_labels.silence_frame_indices() | |
linguistic_features = np.delete(linguistic_features, indices, axis=0) | |
# Apply normalization | |
ty = "acoustic" | |
linguistic_features = minmax_scale(linguistic_features, | |
X_min[ty], X_max[ty], feature_range=(0.01, 0.99)) | |
# Predict acoustic features | |
acoustic_model = acoustic_model.cpu() | |
acoustic_model.eval() | |
x = Variable(torch.from_numpy(linguistic_features)).float() | |
acoustic_predicted = acoustic_model(x).data.numpy() | |
# Apply denormalization | |
acoustic_predicted = acoustic_predicted * Y_scale[ty] + Y_mean[ty] | |
return gen_waveform(acoustic_predicted, post_filter) | |
class MyNet(torch.nn.Module): | |
"""Very simple deep neural networks. | |
""" | |
def __init__(self, D_in, H, D_out, num_layers=2): | |
super(MyNet, self).__init__() | |
self.first_linear = nn.Linear(D_in, H) | |
self.hidden_layers = nn.ModuleList( | |
[nn.Linear(H, H) for _ in range(num_layers)]) | |
self.last_linear = nn.Linear(H, D_out) | |
self.relu = nn.Tanh() | |
def forward(self, x): | |
h = self.relu(self.first_linear(x)) | |
for hl in self.hidden_layers: | |
h = self.relu(hl(h)) | |
return self.last_linear(h) | |
model = torch.load('model.pkl') | |
label_path = 'data/slt_arctic_full_data/label_state_align/arctic_a0218.lab' | |
waveform = test_one_utt(label_path, models["duration"], models["acoustic"]) | |
wavfile.write('1.wav', rate=fs, data=waveform) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment