Skip to content

Instantly share code, notes, and snippets.

@attitudechunfeng
Created December 4, 2017 04:17
Show Gist options
  • Save attitudechunfeng/58a052f18f6aa24235000cc50618e887 to your computer and use it in GitHub Desktop.
Save attitudechunfeng/58a052f18f6aa24235000cc50618e887 to your computer and use it in GitHub Desktop.
nn.py is train phase and nn_synth.py is test phase
from nnmnkwii.datasets import FileDataSource, FileSourceDataset
from nnmnkwii.datasets import PaddedFileSourceDataset, MemoryCacheDataset
from nnmnkwii.preprocessing import trim_zeros_frames, remove_zeros_frames
from nnmnkwii.preprocessing import minmax, meanvar, minmax_scale, scale
from nnmnkwii import paramgen
from nnmnkwii.io import hts
from nnmnkwii.frontend import merlin as fe
from nnmnkwii.postfilters import merlin_post_filter
from nnmnkwii.datasets import MemoryCacheFramewiseDataset
from os.path import join, expanduser, basename, splitext, basename, exists
import os
from glob import glob
import numpy as np
from scipy.io import wavfile
from sklearn.model_selection import train_test_split
import pyworld
import pysptk
from torch.utils import data as data_utils
import torch
from torch import nn
from torch.autograd import Variable
from tqdm import tnrange, tqdm
from torch import optim
import pickle as pkl
#-----------------
DATA_ROOT = "./data/slt_arctic_full_data"
test_size = 0.112
random_state = 1234
#-----------------
mgc_dim = 180
lf0_dim = 3
vuv_dim = 1
bap_dim = 3
duration_linguistic_dim = 416
acoustic_linguisic_dim = 425
duration_dim = 5
acoustic_dim = mgc_dim + lf0_dim + vuv_dim + bap_dim
fs = 16000
frame_period = 5
hop_length = 80
fftlen = 1024
alpha = 0.41
mgc_start_idx = 0
lf0_start_idx = 180
vuv_start_idx = 183
bap_start_idx = 184
windows = [
(0, 0, np.array([1.0])),
(1, 1, np.array([-0.5, 0.0, 0.5])),
(1, 1, np.array([1.0, -2.0, 1.0])),
]
#-----------------
class BinaryFileSource(FileDataSource):
def __init__(self, data_root, dim, train):
self.data_root = data_root
self.dim = dim
self.train = train
def collect_files(self):
files = sorted(glob(join(self.data_root, "*.bin")))
files = files[:len(files)-5] # last 5 is real testset
train_files, test_files = train_test_split(files, test_size=test_size,
random_state=random_state)
if self.train:
return train_files
else:
return test_files
def collect_features(self, path):
return np.fromfile(path, dtype=np.float32).reshape(-1, self.dim)
#-----------------
X = {"duration":{}, "acoustic": {}}
Y = {"duration":{}, "acoustic": {}}
utt_lengths = {"duration":{}, "acoustic": {}}
for ty in ["duration", "acoustic"]:
for phase in ["train", "test"]:
train = phase == "train"
x_dim = duration_linguistic_dim if ty == "duration" else acoustic_linguisic_dim
y_dim = duration_dim if ty == "duration" else acoustic_dim
X[ty][phase] = FileSourceDataset(BinaryFileSource(join(DATA_ROOT, "X_{}".format(ty)),
dim=x_dim,
train=train))
Y[ty][phase] = FileSourceDataset(BinaryFileSource(join(DATA_ROOT, "Y_{}".format(ty)),
dim=y_dim,
train=train))
utt_lengths[ty][phase] = np.array([len(x) for x in X[ty][phase]], dtype=np.int)
#-----------------
X_min = {}
X_max = {}
Y_mean = {}
Y_var = {}
Y_scale = {}
for typ in ["acoustic", "duration"]:
X_min[typ], X_max[typ] = minmax(X[typ]["train"], utt_lengths[typ]["train"])
Y_mean[typ], Y_var[typ] = meanvar(Y[typ]["train"], utt_lengths[typ]["train"])
Y_scale[typ] = np.sqrt(Y_var[typ])
norm_params = [X_min, X_max, Y_mean, Y_var, Y_scale]
with open('norm_params.pkl', 'wb') as fout:
pkl.dump(norm_params, fout)
#-----------------
class PyTorchDataset(torch.utils.data.Dataset):
"""Thin dataset wrapper for pytorch
This does just two things:
1. On-demand normalization
2. Returns torch.tensor instead of ndarray
"""
def __init__(self, X, Y, X_min, X_max, Y_mean, Y_scale):
self.X = X
self.Y = Y
self.X_min = X_min
self.X_max = X_max
self.Y_mean = Y_mean
self.Y_scale = Y_scale
def __getitem__(self, idx):
x, y = self.X[idx], self.Y[idx]
x = minmax_scale(x, self.X_min, self.X_max, feature_range=(0.01, 0.99))
y = scale(y, self.Y_mean, self.Y_scale)
x, y = torch.from_numpy(x), torch.from_numpy(y)
return x, y
def __len__(self):
return len(self.X)
#-----------------
class MyNet(torch.nn.Module):
"""Very simple deep neural networks.
"""
def __init__(self, D_in, H, D_out, num_layers=2):
super(MyNet, self).__init__()
self.first_linear = nn.Linear(D_in, H)
self.hidden_layers = nn.ModuleList(
[nn.Linear(H, H) for _ in range(num_layers)])
self.last_linear = nn.Linear(H, D_out)
self.relu = nn.Tanh()
def forward(self, x):
h = self.relu(self.first_linear(x))
for hl in self.hidden_layers:
h = self.relu(hl(h))
return self.last_linear(h)
#-----------------
num_hidden_layers = 3
hidden_size = 256
batch_size = 256
# We use PyTorch's multiprocess iterator. Note that large n_workers causes
# dataset copies across proccess.
n_workers = 4
pin_memory = True
nepoch = 25
lr = 0.001
weight_decay = 1e-6
use_cuda = torch.cuda.is_available()
#-----------------
def train(model, optimizer, X, Y, X_min, X_max, Y_mean, Y_scale,
utt_lengths, cache_size=1000):
if use_cuda:
model = model.cuda()
X_train, X_test = X["train"], X["test"]
Y_train, Y_test = Y["train"], Y["test"]
train_lengths, test_lengths = utt_lengths["train"], utt_lengths["test"]
# Frame-wise train data loader
X_train_cache_dataset = MemoryCacheFramewiseDataset(
X_train, train_lengths, cache_size)
Y_train_cache_dataset = MemoryCacheFramewiseDataset(
Y_train, train_lengths, cache_size)
train_dataset = PyTorchDataset(X_train_cache_dataset, Y_train_cache_dataset,
X_min, X_max, Y_mean, Y_scale)
train_loader = data_utils.DataLoader(
train_dataset, batch_size=batch_size, num_workers=n_workers,
pin_memory=pin_memory, shuffle=True)
print("Train dataset number of frames", len(train_dataset))
# Frame-wise test data loader
X_test_cache_dataset = MemoryCacheFramewiseDataset(
X_test, test_lengths, cache_size)
Y_test_cache_dataset = MemoryCacheFramewiseDataset(
Y_test, test_lengths, cache_size)
test_dataset = PyTorchDataset(X_test_cache_dataset, Y_test_cache_dataset,
X_min, X_max, Y_mean, Y_scale)
test_loader = data_utils.DataLoader(
test_dataset, batch_size=batch_size, num_workers=n_workers,
pin_memory=pin_memory, shuffle=False)
print("Test dataset numer of frames", len(test_dataset))
dataset_loaders = {"train": train_loader, "test": test_loader}
# Training loop
criterion = nn.MSELoss()
model.train()
print("Start frame-wise training...")
loss_history = {"train": [], "test": []}
for epoch in tnrange(nepoch):
for phase in ["train", "test"]:
if phase == "train":
model.train()
else:
model.eval()
running_loss = 0.0
for x, y in dataset_loaders[phase]:
if use_cuda:
x, y = x.cuda(), y.cuda()
x, y = Variable(x), Variable(y)
optimizer.zero_grad()
y_hat = model(x)
loss = criterion(y_hat, y)
if phase == "train":
loss.backward()
optimizer.step()
running_loss += loss.data[0]
loss_history[phase].append(running_loss / len(dataset_loaders[phase]))
return loss_history
#-----------------
models = {}
for typ in ["duration", "acoustic"]:
models[typ] = MyNet(X[typ]["train"][0].shape[-1],
hidden_size, Y[typ]["train"][0].shape[-1],
num_hidden_layers)
print("Model for {}\n".format(typ), models[typ])
#-----------------
ty = "duration"
optimizer = optim.Adam(models[ty].parameters(), lr=lr, weight_decay=weight_decay)
loss_history = train(models[ty], optimizer, X[ty], Y[ty],
X_min[ty], X_max[ty], Y_mean[ty], Y_scale[ty], utt_lengths[ty])
#-----------------
ty = "acoustic"
optimizer = optim.Adam(models[ty].parameters(), lr=lr, weight_decay=weight_decay)
loss_history = train(models[ty], optimizer, X[ty], Y[ty],
X_min[ty], X_max[ty], Y_mean[ty], Y_scale[ty], utt_lengths[ty])
#-----------------
torch.save(models, 'model.pkl')
from nnmnkwii.preprocessing import trim_zeros_frames, remove_zeros_frames
from nnmnkwii.preprocessing import minmax, meanvar, minmax_scale, scale
from nnmnkwii import paramgen
from nnmnkwii.io import hts
from nnmnkwii.postfilters import merlin_post_filter
from nnmnkwii.frontend import merlin as fe
import torch
from torch import nn
from torch.autograd import Variable
from os.path import join, expanduser, basename, splitext, basename, exists
import os
import numpy as np
import pyworld
import pysptk
import pickle as pkl
DATA_ROOT = "./data/slt_arctic_full_data"
#-----------------
mgc_dim = 180
lf0_dim = 3
vuv_dim = 1
bap_dim = 3
duration_linguistic_dim = 416
acoustic_linguisic_dim = 425
duration_dim = 5
acoustic_dim = mgc_dim + lf0_dim + vuv_dim + bap_dim
fs = 16000
frame_period = 5
hop_length = 80
fftlen = 1024
alpha = 0.41
mgc_start_idx = 0
lf0_start_idx = 180
vuv_start_idx = 183
bap_start_idx = 184
windows = [
(0, 0, np.array([1.0])),
(1, 1, np.array([-0.5, 0.0, 0.5])),
(1, 1, np.array([1.0, -2.0, 1.0])),
]
#-----------------
norm_params = pkl.load(open('norm_params.pkl', 'rb'))
X_min = norm_params[0]
X_max = norm_params[1]
Y_mean = norm_params[2]
Y_var = norm_params[3]
Y_scale = norm_params[4]
binary_dict, continuous_dict = hts.load_question_set(join(DATA_ROOT, "questions-radio_dnn_416.hed"))
def gen_parameters(y_predicted):
# Number of time frames
T = y_predicted.shape[0]
# Split acoustic features
mgc = y_predicted[:,:lf0_start_idx]
lf0 = y_predicted[:,lf0_start_idx:vuv_start_idx]
vuv = y_predicted[:,vuv_start_idx]
bap = y_predicted[:,bap_start_idx:]
# Perform MLPG
ty = "acoustic"
mgc_variances = np.tile(Y_var[ty][:lf0_start_idx], (T, 1))
mgc = paramgen.mlpg(mgc, mgc_variances, windows)
lf0_variances = np.tile(Y_var[ty][lf0_start_idx:vuv_start_idx], (T,1))
lf0 = paramgen.mlpg(lf0, lf0_variances, windows)
bap_variances = np.tile(Y_var[ty][bap_start_idx:], (T, 1))
bap = paramgen.mlpg(bap, bap_variances, windows)
return mgc, lf0, vuv, bap
def gen_waveform(y_predicted, do_postfilter=False):
y_predicted = trim_zeros_frames(y_predicted)
# Generate parameters and split streams
mgc, lf0, vuv, bap = gen_parameters(y_predicted)
if do_postfilter:
mgc = merlin_post_filter(mgc, alpha)
spectrogram = pysptk.mc2sp(mgc, fftlen=fftlen, alpha=alpha)
aperiodicity = pyworld.decode_aperiodicity(bap.astype(np.float64), fs, fftlen)
f0 = lf0.copy()
f0[vuv < 0.5] = 0
f0[np.nonzero(f0)] = np.exp(f0[np.nonzero(f0)])
generated_waveform = pyworld.synthesize(f0.flatten().astype(np.float64),
spectrogram.astype(np.float64),
aperiodicity.astype(np.float64),
fs, frame_period)
return generated_waveform
def gen_duration(label_path, duration_model):
# Linguistic features for duration
hts_labels = hts.load(label_path)
duration_linguistic_features = fe.linguistic_features(hts_labels,
binary_dict, continuous_dict,
add_frame_features=False,
subphone_features=None).astype(np.float32)
# Apply normalization
ty = "duration"
duration_linguistic_features = minmax_scale(
duration_linguistic_features, X_min[ty], X_max[ty], feature_range=(0.01, 0.99))
# Apply model
duration_model = duration_model.cpu()
duration_model.eval()
x = Variable(torch.from_numpy(duration_linguistic_features)).float()
duration_predicted = duration_model(x).data.numpy()
# Apply denormalization
duration_predicted = duration_predicted * Y_scale[ty] + Y_mean[ty]
duration_predicted = np.round(duration_predicted)
# Set minimum state duration to 1
duration_predicted[duration_predicted <= 0] = 1
hts_labels.set_durations(duration_predicted)
return hts_labels
def test_one_utt(label_path, duration_model, acoustic_model, post_filter=True):
# Predict durations
duration_modified_hts_labels = gen_duration(label_path, duration_model)
# Linguistic features
linguistic_features = fe.linguistic_features(duration_modified_hts_labels,
binary_dict, continuous_dict,
add_frame_features=True,
subphone_features="full")
# Trim silences
indices = duration_modified_hts_labels.silence_frame_indices()
linguistic_features = np.delete(linguistic_features, indices, axis=0)
# Apply normalization
ty = "acoustic"
linguistic_features = minmax_scale(linguistic_features,
X_min[ty], X_max[ty], feature_range=(0.01, 0.99))
# Predict acoustic features
acoustic_model = acoustic_model.cpu()
acoustic_model.eval()
x = Variable(torch.from_numpy(linguistic_features)).float()
acoustic_predicted = acoustic_model(x).data.numpy()
# Apply denormalization
acoustic_predicted = acoustic_predicted * Y_scale[ty] + Y_mean[ty]
return gen_waveform(acoustic_predicted, post_filter)
class MyNet(torch.nn.Module):
"""Very simple deep neural networks.
"""
def __init__(self, D_in, H, D_out, num_layers=2):
super(MyNet, self).__init__()
self.first_linear = nn.Linear(D_in, H)
self.hidden_layers = nn.ModuleList(
[nn.Linear(H, H) for _ in range(num_layers)])
self.last_linear = nn.Linear(H, D_out)
self.relu = nn.Tanh()
def forward(self, x):
h = self.relu(self.first_linear(x))
for hl in self.hidden_layers:
h = self.relu(hl(h))
return self.last_linear(h)
model = torch.load('model.pkl')
label_path = 'data/slt_arctic_full_data/label_state_align/arctic_a0218.lab'
waveform = test_one_utt(label_path, models["duration"], models["acoustic"])
wavfile.write('1.wav', rate=fs, data=waveform)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment