Last active
October 3, 2022 15:27
-
-
Save juanmc2005/ff4d27a44acab1cbcdcc497d6b942164 to your computer and use it in GitHub Desktop.
PLDA scoring using Pyannote (https://github.com/pyannote/pyannote-audio) and a customized version of PLDA (https://github.com/RaviSoji/plda) to include some specific features like length normalization and latent space dimension tuning
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2017 Ravi Sojitra. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# ============================================================================== | |
import numpy as np | |
from scipy.stats import multivariate_normal as gaussian | |
from sklearn.decomposition import PCA | |
from plda.optimizer import calc_scatter_matrices | |
from plda.optimizer import get_posterior_params | |
from plda.optimizer import get_posterior_predictive_params | |
from plda.optimizer import get_prior_params | |
from plda.optimizer import optimize_maximum_likelihood | |
def get_space_walk(from_space, to_space): | |
U_model_to_D = ['U_model', 'U', 'X', 'D'] | |
D_to_U_model = U_model_to_D[::-1] | |
assert from_space in U_model_to_D and to_space in U_model_to_D | |
from_idx = U_model_to_D.index(from_space) | |
to_idx = U_model_to_D.index(to_space) | |
if to_idx < from_idx: | |
spaces = D_to_U_model | |
from_idx = D_to_U_model.index(from_space) | |
to_idx = D_to_U_model.index(to_space) | |
else: | |
spaces = U_model_to_D | |
from_spaces = [x for x in spaces[from_idx: to_idx]] | |
to_spaces = [x for x in spaces[from_idx + 1: to_idx + 1]] | |
return zip(from_spaces, to_spaces) | |
def transform_D_to_X(data, pca): | |
return data if pca is None else pca.transform(data) | |
def transform_X_to_U(data, inv_A, m): | |
return np.matmul(data - m, inv_A.T) | |
def transform_U_to_U_model(data, relevant_U_dims): | |
return data[..., relevant_U_dims] | |
def transform_U_model_to_U(data, relevant_U_dims, U_dimensionality): | |
shape = (*data.shape[:-1], U_dimensionality) | |
U = np.zeros(shape) | |
U[..., relevant_U_dims] = data | |
return U | |
def transform_U_to_X(data, A, m): | |
return m + np.matmul(data, A.T) | |
def transform_X_to_D(data, pca): | |
return data if pca is None else pca.inverse_transform(data) | |
class PLDA: | |
def __init__(self, row_wise_data, labels, n_principal_components=None, n_relevant_dims_fraction=None): | |
assert len(row_wise_data.shape) == 2 | |
assert len(labels) == row_wise_data.shape[0] | |
self.pca = None | |
self.m = None | |
self.A = None | |
self.Psi = None | |
self.relevant_U_dims = None | |
self.inv_A = None | |
self.prior_params = None | |
self.posterior_params = None | |
self.posterior_predictive_params = None | |
self.fit(row_wise_data, labels, n_principal_components, n_relevant_dims_fraction) | |
def calc_logp_posterior(self, v_model, category): | |
assert v_model.shape[-1] == self.get_dimensionality('U_model') | |
mean = self.posterior_params[category]['mean'] | |
cov_diag = self.posterior_params[category]['cov_diag'] | |
return gaussian(mean, np.diag(cov_diag)).logpdf(v_model) | |
def calc_logp_posterior_predictive(self, U_model, category): | |
assert U_model.shape[-1] == self.get_dimensionality('U_model') | |
mean = self.posterior_predictive_params[category]['mean'] | |
cov_diag = self.posterior_predictive_params[category]['cov_diag'] | |
return gaussian(mean, np.diag(cov_diag)).logpdf(U_model) | |
def calc_logp_marginal_likelihood(self, U_model): | |
""" Computes the log marginal likelihood on axis=-2. """ | |
assert U_model.shape[-1] == self.get_dimensionality('U_model') | |
if len(U_model.shape) == 1: | |
U_model = U_model[None, :] | |
n = U_model.shape[-2] | |
psi_diag = self.prior_params['cov_diag'] | |
n_psi_plus_eye = n * psi_diag + 1 | |
log_constant = -.5 * n * np.log(2 * np.pi) | |
log_constant += -.5 * np.log(n_psi_plus_eye) | |
sum_of_squares = np.sum(U_model ** 2, axis=-2) | |
log_exponent_1 = -.5 * sum_of_squares | |
mean = U_model.mean(axis=-2) | |
log_exponent_2 = .5 * (n ** 2 * psi_diag * mean ** 2) | |
log_exponent_2 /= n_psi_plus_eye | |
logp_ml = log_constant + log_exponent_1 + log_exponent_2 | |
logp_ml = np.sum(logp_ml, axis=-1) | |
return logp_ml | |
def calc_logp_prior(self, v_model): | |
assert v_model.shape[-1] == self.get_dimensionality('U_model') | |
mean = self.prior_params['mean'] | |
cov_diag = self.prior_params['cov_diag'] | |
return gaussian(mean, np.diag(cov_diag)).logpdf(v_model) | |
def calc_same_diff_likelihood_ratio(self, U_model_p, U_model_g): | |
assert U_model_p.shape[-1] == self.get_dimensionality('U_model') | |
assert U_model_g.shape[-1] == self.get_dimensionality('U_model') | |
U_model_same = np.concatenate([ | |
U_model_p, | |
U_model_g, | |
]) | |
ll_same = self.calc_logp_marginal_likelihood(U_model_same) | |
ll_p = self.calc_logp_marginal_likelihood(U_model_p) | |
ll_g = self.calc_logp_marginal_likelihood(U_model_g) | |
ll_same = ll_same - (ll_p + ll_g) | |
return ll_same | |
def length_normalize(self, data): | |
# Normalize length of vectors to equal sqrt(embedding_dim) | |
sq_emb_dim = np.sqrt(data.shape[1]) | |
# Calculate the norm as a column vector and repeat it `embedding_dim` times to broadcast division later | |
norms = np.linalg.norm(data, axis=1) | |
norms = np.expand_dims(norms, axis=1).repeat(data.shape[1], axis=1) | |
# Normalize using norms | |
data = sq_emb_dim * (data / norms) | |
# Verify normalization was done correctly at least for the first 1000 embeddings | |
assert np.allclose(np.linalg.norm(data[:1000], axis=1), np.zeros(1000) + sq_emb_dim) | |
return data | |
def fit(self, data, labels, n_principal_components=None, n_relevant_dims_fraction=None): | |
if n_principal_components is None: | |
S_b, S_w = calc_scatter_matrices(data, labels) | |
matrix_rank = np.linalg.matrix_rank(S_w) | |
else: | |
matrix_rank = n_principal_components | |
if matrix_rank != data.shape[-1]: | |
self.pca = PCA(n_components=matrix_rank) | |
self.pca.fit(data) | |
X = self.transform(data, from_space='D', to_space='X') | |
X = self.length_normalize(X) | |
self.m, self.A, self.Psi, self.relevant_U_dims, self.inv_A = \ | |
optimize_maximum_likelihood(X, labels, n_relevant_dims_fraction) | |
U_model = self.transform(X, from_space='X', to_space='U_model') | |
self.prior_params = \ | |
get_prior_params(self.Psi, self.relevant_U_dims) | |
self.posterior_params = \ | |
get_posterior_params(U_model, labels, self.prior_params) | |
self.posterior_predictive_params = \ | |
get_posterior_predictive_params(self.posterior_params) | |
def get_dimensionality(self, space): | |
if space == 'U_model': | |
return self.relevant_U_dims.shape[0] | |
elif space == 'U': | |
return self.A.shape[0] | |
elif space == 'X': | |
return self.A.shape[0] | |
elif space == 'D': | |
if self.pca is None: | |
return self.m.shape[0] | |
else: | |
return self.pca.n_features_ | |
else: | |
raise ValueError | |
def transform(self, data, from_space, to_space): | |
""" Potential_spaces: 'D' <---> 'X' <---> 'U' <---> 'U_model'. | |
DESCRIPTION | |
There are 6 basic transformations to move back and forth | |
between the data space, 'D', and the model's space, 'U_model': | |
1. From D to X. | |
(i.e. from data space to preprocessed space) | |
Uses the minimum number of components from | |
Principal Components Analysis that | |
captures 100% of the variance in the data. | |
2. From X to U. | |
(i.e. from preprocessed space to latent space) | |
See the bottom of p.533 of Ioffe 2006. | |
3. From U to U_model. | |
(i.e. from latent space to the model space) | |
See Fig 2 on p.537 of Ioffe 2006. | |
4. From U_model to U. | |
(i.e. from the model space to latent space) | |
5. From U to X. | |
(i.e. from the latent space to the preprocessed space) | |
6. From X to D. | |
(i.e. from the preprocessed space to the data space) | |
""" | |
if len(data.shape) == 1: | |
data = data[None, :] | |
if from_space == 'D' and to_space == 'X': | |
return transform_D_to_X(data, self.pca) | |
elif from_space == 'X' and to_space == 'U': | |
return transform_X_to_U(data, self.inv_A, self.m) | |
elif from_space == 'U' and to_space == 'U_model': | |
return transform_U_to_U_model(data, self.relevant_U_dims) | |
elif from_space == 'U_model' and to_space == 'U': | |
dim = self.get_dimensionality('U') | |
return transform_U_model_to_U(data, self.relevant_U_dims, dim) | |
elif from_space == 'U' and to_space == 'X': | |
return transform_U_to_X(data, self.A, self.m) | |
elif from_space == 'X' and to_space == 'D': | |
return transform_X_to_D(data, self.pca) | |
else: | |
transformed = data | |
for space_1, space_2 in get_space_walk(from_space, to_space): | |
transformed = self.transform(transformed, space_1, space_2) | |
return transformed |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2017 Ravi Sojitra. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# ============================================================================== | |
import numpy as np | |
from scipy.linalg import eigh | |
def optimize_maximum_likelihood(X, labels, n_relevant_dims_fraction): | |
""" Performs the optimization in Fig. 2 of p.537 of Ioffe 2006. | |
DESCRIPTION | |
- The main model parameters are `m`, `A`, and `Psi`. | |
- However, to improve the performance (speed and numerical stability) | |
of the plda.Model object, | |
inv_A and relevant_U_dims are also returned here. | |
ADDITIONAL NOTES | |
Be sure to test that np.cov(X.T) is full rank before running this. | |
Recall that there are 4 \"spaces\": | |
'D' (data) <---> 'X' (preprocessed) <---> 'U' (latent) <---> 'U_model' | |
ARGUMENTS | |
X (numpy.ndarray), shape=(n_data, n_dimensions) | |
- Data in statistics format, i.e. row-wise. | |
labels (list or numpy.ndarray), length=X.shape[0] | |
- Labels for the data in `X`. | |
- Must be sorted in the same order as `X`. | |
RETURNS | |
m (numpy.ndarray), shape=X.shape[-1] | |
- The mean of the row vectors in X. | |
- This is the prior mean fitted via maximum likelihood. | |
A (numpy.ndarray), shape=(X.shape[-1], X.shape[-1]) | |
- Transformation from X space to the latent U space. | |
Psi (numpy.ndarray), shape=(X.shape[-1], X.shape[-1]) | |
- The covariance matrix of the prior distribution on | |
the category means in U space. | |
relevant_U_dims (numpy.ndarray), shape=(len(np.unique(labels)) - 1,) | |
- The \"effective\" latent dimensions, | |
i.e. the ones that are actually used by the model. | |
inv_A (numpy.ndarray), shape=A.shape | |
- The inverse of the matrix A. | |
- Transformation from the latent U space to the X space. | |
""" | |
assert len(X.shape) == 2 | |
assert X.shape[0] == len(labels) | |
m = calc_m(X) | |
S_b, S_w = calc_scatter_matrices(X, labels) | |
W = calc_W(S_b, S_w) | |
Lambda_b = calc_Lambda_b(S_b, W) | |
Lambda_w = calc_Lambda_w(S_w, W) | |
n_avg = calc_n_avg(labels) | |
A = calc_A(n_avg, Lambda_w, W) | |
inv_A = np.linalg.inv(A) | |
Psi = calc_Psi(Lambda_w, Lambda_b, n_avg) | |
relevant_U_dims = get_relevant_U_dims(Psi, n_relevant_dims_fraction) | |
return m, A, Psi, relevant_U_dims, inv_A | |
def as_dictionary_of_dictionaries(labels, means, cov_diags): | |
""" Dictionary storing one dictionary of parameters per category. """ | |
assert len(labels) == len(means) == len(cov_diags) | |
all_params = dict() | |
for label, mean, cov_diag in zip(labels, means, cov_diags): | |
category_params = dict() | |
category_params['mean'] = mean | |
category_params['cov_diag'] = cov_diag | |
all_params[label] = category_params | |
return all_params | |
def calc_A(n_avg, Lambda_w, W): | |
""" See Fig. 2 on p.537 of Ioffe 2006. """ | |
Lambda_w_diagonal = Lambda_w.diagonal() # Should be diagonal matrix. | |
inv_W_T = np.linalg.inv(W.T) | |
return inv_W_T * (n_avg / (n_avg - 1) * Lambda_w_diagonal) ** .5 | |
def calc_Lambda_b(S_b, W): | |
""" See Fig. 2 on p.537 of Ioffe 2006. """ | |
return np.matmul(np.matmul(W.T, S_b), W) | |
def calc_Lambda_w(S_w, W): | |
""" See Fig. 2 on p.537 of Ioffe 2006. """ | |
return np.matmul(np.matmul(W.T, S_w), W) | |
def calc_m(X): | |
""" See Fig. 2 on p.537 of Ioffe 2006. """ | |
assert len(X.shape) == 2 | |
return X.mean(axis=0) | |
def calc_n_avg(Y): | |
""" This is the \"hack\" suggested in Fig 2 on p.537 of Ioffe 2006. """ | |
unique = np.unique(Y) | |
return len(Y) / unique.shape[0] | |
def calc_Psi(Lambda_w, Lambda_b, n_avg): | |
""" See Fig. 2 on p.537 of Ioffe 2006. """ | |
Lambda_w_diagonal = Lambda_w.diagonal() # Should be diagonal matrix. | |
Lambda_b_diagonal = Lambda_b.diagonal() # Should be diagonal matrix. | |
Psi = (n_avg - 1) / n_avg * Lambda_b_diagonal / Lambda_w_diagonal | |
Psi -= 1 / n_avg | |
Psi[Psi <= 0] = 0 | |
return np.diag(Psi) | |
def calc_scatter_matrices(X, Y): | |
""" See Equations (1) on p.532 of Ioffe 2006. """ | |
assert len(X.shape) == 2 | |
assert X.shape[0] == len(Y) | |
unique_labels = np.unique(Y) | |
labels = np.asarray(Y) | |
m = calc_m(X) | |
N = X.shape[0] | |
cov_ks = [] | |
m_ks = [] | |
n_ks = [] | |
for k in unique_labels: | |
bool_idxs = labels == k | |
X_k = X[bool_idxs] | |
m_ks.append(X_k.mean(axis=0)) | |
n_ks.append(bool_idxs.sum()) | |
cov_ks.append(np.cov(X_k.T)) | |
n_ks = np.asarray(n_ks) | |
m_ks = np.asarray(m_ks) | |
m_ks_minus_m = m_ks - m | |
S_b = np.matmul(m_ks_minus_m.T * (n_ks / N), m_ks_minus_m) | |
S_w = np.asarray(cov_ks) * ((n_ks - 1) / N)[:, None, None] | |
S_w = np.sum(S_w, axis=0) | |
return S_b, S_w | |
def calc_W(S_b, S_w): | |
""" See Fig. 2 on p.537 of Ioffe 2006. """ | |
eigenvalues, eigenvectors = eigh(S_b, S_w) | |
return eigenvectors | |
def get_posterior_params(U_model, Y, prior_params): | |
labels = np.asarray(Y) | |
prior_cov_diagonal = prior_params['cov_diag'] | |
cov_diags = [] | |
means = [] | |
categories = [] | |
for k in np.unique(labels): | |
bool_idxs = labels == k | |
U_model_k = U_model[bool_idxs] | |
n_k = bool_idxs.sum() | |
cov_diag = prior_cov_diagonal / (1 + n_k * prior_cov_diagonal) | |
mean = U_model_k.sum(axis=0) * cov_diag | |
cov_diags.append(cov_diag) | |
means.append(mean) | |
categories.append(k) | |
return as_dictionary_of_dictionaries(categories, means, cov_diags) | |
def get_posterior_predictive_params(posterior_params): | |
""" Likelihood covariance matrix is an Identity matrix. """ | |
pp_params = posterior_params.copy() | |
for k, k_params in pp_params.items(): | |
k_params['cov_diag'] += 1 | |
return pp_params | |
def get_prior_params(Psi, dims): | |
""" See Equation (2) on p.533 of Ioffe 2006. """ | |
cov_diag = Psi.diagonal()[dims] | |
mean = np.zeros(dims.shape) | |
return {'mean': mean, 'cov_diag': cov_diag} | |
def get_relevant_U_dims(Psi, n_relevant_dims_fraction): | |
""" See Fig. 2 on p.537 of Ioffe 2006. """ | |
relevant_dims = np.squeeze(np.argwhere(Psi.diagonal() != 0)) | |
if n_relevant_dims_fraction is not None: | |
n_dims_keep = int(n_relevant_dims_fraction * relevant_dims.shape[0]) | |
relevant_dims = np.argsort(Psi.diagonal())[-n_dims_keep:] | |
if relevant_dims.shape == (): | |
relevant_dims = relevant_dims.reshape(1,) | |
return relevant_dims |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import numpy as np | |
from pyannote.database import get_protocol | |
from pyannote.metrics.binary_classification import det_curve | |
from pyannote.audio.features import Precomputed | |
from plda.model import PLDA | |
from collections import Counter | |
# import matplotlib.pyplot as plt | |
# from matplotlib.pyplot import figure | |
def evaluate(protocol, precomputed: Precomputed, plda: PLDA, partition: str) -> float: | |
y_true, y_pred = [], [] | |
for trial in getattr(protocol, f'{partition}_trial')(): | |
# Get embeddings from PLDA | |
emb1 = plda.transform(data=precomputed(trial['file1']).data, from_space='D', to_space='U_model') | |
emb2 = plda.transform(data=precomputed(trial['file2']).data, from_space='D', to_space='U_model') | |
# Calculate PLDA score between PLDA latent embeddings | |
score = plda.calc_same_diff_likelihood_ratio(emb1, emb2) | |
# Keep track of score and reference | |
y_pred.append(score) | |
y_true.append(trial['reference']) | |
_, _, _, eer = det_curve(np.array(y_true), np.array(y_pred), distances=False) | |
return eer | |
PRETRAINED = os.environ['PRETRAINED'] | |
precomputed = Precomputed(f'{PRETRAINED}/apply/2134', use_memmap=False) | |
protocol = get_protocol('VoxCeleb.SpeakerVerification.VoxCeleb1_X') | |
open('dump.txt', 'w').close() | |
# Get precomputed embeddings | |
print('[Getting precomputed embeddings]') | |
embs, y = [], [] | |
for train_file in protocol.train(): | |
emb = precomputed(train_file).data.astype(np.float32) | |
label = int(train_file['annotation'].labels()[0][2:]) | |
embs.append(emb) | |
y.append(np.zeros(emb.shape[0]) + label) | |
embs = np.concatenate(embs, axis=0) | |
y = np.concatenate(y, axis=0) | |
# print('Plotting segment class distribution') | |
c = Counter(y) | |
speakers = np.array(list(c.keys())).astype(int) | |
segment_counts = np.array(list(c.values())) | |
# figure(num=None, figsize=(10, 6), dpi=80, facecolor='w', edgecolor='k') | |
# plt.hist(segment_counts, alpha=0.5, density=False, color='blue', histtype='stepfilled') | |
# plt.title('Segment Class Distribution (VoxCeleb1_X.train)') | |
# plt.xlabel('Number of Segments') | |
# plt.ylabel('% of Speakers') | |
# plt.savefig("segment_class_distribution.png") | |
print('[Sampling 500 segments per speaker]') | |
sample_embs, sample_y = [], [] | |
for speaker, n_segments in zip(speakers, segment_counts): | |
sampling_step = n_segments // 500 | |
choices = embs[np.where(y == speaker)[0]][::sampling_step][:500] | |
sample_embs.append(choices) | |
sample_y.append(np.zeros(choices.shape[0]) + speaker) | |
sample_embs, sample_y = np.concatenate(sample_embs, axis=0), np.concatenate(sample_y, axis=0) | |
embs, y = sample_embs, sample_y | |
# Center embeddings | |
embs -= np.mean(embs, axis=0) | |
# Train PLDA tuning number of components using grid search | |
print('[Training PLDA]') | |
best_pca_ncomp, best_plda_ncomp, best_eer, best_plda = -1, -1, 100, None | |
for pca_ncomp in [None, 10, 20, 30, 40, 50, 80, 100, 120, 150, 200, 250, 300]: | |
for plda_ncomp in [0.2, 0.4, 0.5, 0.6, 0.8, 1.]: | |
print(f'[Fitting PLDA with {pca_ncomp} PCA components and {int(plda_ncomp * 100)}% of PLDA components]') | |
# Embedding length normalization is done between PCA and PLDA | |
plda = PLDA(embs, y, n_principal_components=pca_ncomp, n_relevant_dims_fraction=plda_ncomp) | |
plda_dims = len(plda.relevant_U_dims) | |
print(f'\tPLDA kept {plda_dims} components') | |
# Evaluate on Dev | |
print('\tEvaluating PLDA') | |
eer = evaluate(protocol, precomputed, plda, 'development') | |
print(f"\tDEV EER = {eer}") | |
with open('dump.txt', 'a') as file: | |
file.write(f'{pca_ncomp if pca_ncomp is not None else 0},{plda_ncomp},{plda_dims},{eer}\n') | |
if eer < best_eer: | |
best_eer, best_pca_ncomp, best_plda_ncomp, best_plda = eer, pca_ncomp, plda_ncomp, plda | |
print(f"Best Dev EER ({best_eer}) was achieved using {best_pca_ncomp} PCA components " | |
f"and {int(plda_ncomp * 100)}% of PLDA components") | |
# Evaluate on Test | |
print(f"Evaluating on Test using {best_pca_ncomp} PCA components and " | |
f"{int(best_plda_ncomp * 100)}% of PLDA components ({len(best_plda.relevant_U_dims)})") | |
eer = evaluate(protocol, precomputed, best_plda, 'test') | |
print(f"TEST EER = {eer}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
TO DO