Skip to content

Instantly share code, notes, and snippets.

@juanmc2005
Last active October 3, 2022 15:27
Show Gist options
  • Save juanmc2005/ff4d27a44acab1cbcdcc497d6b942164 to your computer and use it in GitHub Desktop.
Save juanmc2005/ff4d27a44acab1cbcdcc497d6b942164 to your computer and use it in GitHub Desktop.
PLDA scoring using Pyannote (https://github.com/pyannote/pyannote-audio) and a customized version of PLDA (https://github.com/RaviSoji/plda) to include some specific features like length normalization and latent space dimension tuning
# Copyright 2017 Ravi Sojitra. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import numpy as np
from scipy.stats import multivariate_normal as gaussian
from sklearn.decomposition import PCA
from plda.optimizer import calc_scatter_matrices
from plda.optimizer import get_posterior_params
from plda.optimizer import get_posterior_predictive_params
from plda.optimizer import get_prior_params
from plda.optimizer import optimize_maximum_likelihood
def get_space_walk(from_space, to_space):
U_model_to_D = ['U_model', 'U', 'X', 'D']
D_to_U_model = U_model_to_D[::-1]
assert from_space in U_model_to_D and to_space in U_model_to_D
from_idx = U_model_to_D.index(from_space)
to_idx = U_model_to_D.index(to_space)
if to_idx < from_idx:
spaces = D_to_U_model
from_idx = D_to_U_model.index(from_space)
to_idx = D_to_U_model.index(to_space)
else:
spaces = U_model_to_D
from_spaces = [x for x in spaces[from_idx: to_idx]]
to_spaces = [x for x in spaces[from_idx + 1: to_idx + 1]]
return zip(from_spaces, to_spaces)
def transform_D_to_X(data, pca):
return data if pca is None else pca.transform(data)
def transform_X_to_U(data, inv_A, m):
return np.matmul(data - m, inv_A.T)
def transform_U_to_U_model(data, relevant_U_dims):
return data[..., relevant_U_dims]
def transform_U_model_to_U(data, relevant_U_dims, U_dimensionality):
shape = (*data.shape[:-1], U_dimensionality)
U = np.zeros(shape)
U[..., relevant_U_dims] = data
return U
def transform_U_to_X(data, A, m):
return m + np.matmul(data, A.T)
def transform_X_to_D(data, pca):
return data if pca is None else pca.inverse_transform(data)
class PLDA:
def __init__(self, row_wise_data, labels, n_principal_components=None, n_relevant_dims_fraction=None):
assert len(row_wise_data.shape) == 2
assert len(labels) == row_wise_data.shape[0]
self.pca = None
self.m = None
self.A = None
self.Psi = None
self.relevant_U_dims = None
self.inv_A = None
self.prior_params = None
self.posterior_params = None
self.posterior_predictive_params = None
self.fit(row_wise_data, labels, n_principal_components, n_relevant_dims_fraction)
def calc_logp_posterior(self, v_model, category):
assert v_model.shape[-1] == self.get_dimensionality('U_model')
mean = self.posterior_params[category]['mean']
cov_diag = self.posterior_params[category]['cov_diag']
return gaussian(mean, np.diag(cov_diag)).logpdf(v_model)
def calc_logp_posterior_predictive(self, U_model, category):
assert U_model.shape[-1] == self.get_dimensionality('U_model')
mean = self.posterior_predictive_params[category]['mean']
cov_diag = self.posterior_predictive_params[category]['cov_diag']
return gaussian(mean, np.diag(cov_diag)).logpdf(U_model)
def calc_logp_marginal_likelihood(self, U_model):
""" Computes the log marginal likelihood on axis=-2. """
assert U_model.shape[-1] == self.get_dimensionality('U_model')
if len(U_model.shape) == 1:
U_model = U_model[None, :]
n = U_model.shape[-2]
psi_diag = self.prior_params['cov_diag']
n_psi_plus_eye = n * psi_diag + 1
log_constant = -.5 * n * np.log(2 * np.pi)
log_constant += -.5 * np.log(n_psi_plus_eye)
sum_of_squares = np.sum(U_model ** 2, axis=-2)
log_exponent_1 = -.5 * sum_of_squares
mean = U_model.mean(axis=-2)
log_exponent_2 = .5 * (n ** 2 * psi_diag * mean ** 2)
log_exponent_2 /= n_psi_plus_eye
logp_ml = log_constant + log_exponent_1 + log_exponent_2
logp_ml = np.sum(logp_ml, axis=-1)
return logp_ml
def calc_logp_prior(self, v_model):
assert v_model.shape[-1] == self.get_dimensionality('U_model')
mean = self.prior_params['mean']
cov_diag = self.prior_params['cov_diag']
return gaussian(mean, np.diag(cov_diag)).logpdf(v_model)
def calc_same_diff_likelihood_ratio(self, U_model_p, U_model_g):
assert U_model_p.shape[-1] == self.get_dimensionality('U_model')
assert U_model_g.shape[-1] == self.get_dimensionality('U_model')
U_model_same = np.concatenate([
U_model_p,
U_model_g,
])
ll_same = self.calc_logp_marginal_likelihood(U_model_same)
ll_p = self.calc_logp_marginal_likelihood(U_model_p)
ll_g = self.calc_logp_marginal_likelihood(U_model_g)
ll_same = ll_same - (ll_p + ll_g)
return ll_same
def length_normalize(self, data):
# Normalize length of vectors to equal sqrt(embedding_dim)
sq_emb_dim = np.sqrt(data.shape[1])
# Calculate the norm as a column vector and repeat it `embedding_dim` times to broadcast division later
norms = np.linalg.norm(data, axis=1)
norms = np.expand_dims(norms, axis=1).repeat(data.shape[1], axis=1)
# Normalize using norms
data = sq_emb_dim * (data / norms)
# Verify normalization was done correctly at least for the first 1000 embeddings
assert np.allclose(np.linalg.norm(data[:1000], axis=1), np.zeros(1000) + sq_emb_dim)
return data
def fit(self, data, labels, n_principal_components=None, n_relevant_dims_fraction=None):
if n_principal_components is None:
S_b, S_w = calc_scatter_matrices(data, labels)
matrix_rank = np.linalg.matrix_rank(S_w)
else:
matrix_rank = n_principal_components
if matrix_rank != data.shape[-1]:
self.pca = PCA(n_components=matrix_rank)
self.pca.fit(data)
X = self.transform(data, from_space='D', to_space='X')
X = self.length_normalize(X)
self.m, self.A, self.Psi, self.relevant_U_dims, self.inv_A = \
optimize_maximum_likelihood(X, labels, n_relevant_dims_fraction)
U_model = self.transform(X, from_space='X', to_space='U_model')
self.prior_params = \
get_prior_params(self.Psi, self.relevant_U_dims)
self.posterior_params = \
get_posterior_params(U_model, labels, self.prior_params)
self.posterior_predictive_params = \
get_posterior_predictive_params(self.posterior_params)
def get_dimensionality(self, space):
if space == 'U_model':
return self.relevant_U_dims.shape[0]
elif space == 'U':
return self.A.shape[0]
elif space == 'X':
return self.A.shape[0]
elif space == 'D':
if self.pca is None:
return self.m.shape[0]
else:
return self.pca.n_features_
else:
raise ValueError
def transform(self, data, from_space, to_space):
""" Potential_spaces: 'D' <---> 'X' <---> 'U' <---> 'U_model'.
DESCRIPTION
There are 6 basic transformations to move back and forth
between the data space, 'D', and the model's space, 'U_model':
1. From D to X.
(i.e. from data space to preprocessed space)
Uses the minimum number of components from
Principal Components Analysis that
captures 100% of the variance in the data.
2. From X to U.
(i.e. from preprocessed space to latent space)
See the bottom of p.533 of Ioffe 2006.
3. From U to U_model.
(i.e. from latent space to the model space)
See Fig 2 on p.537 of Ioffe 2006.
4. From U_model to U.
(i.e. from the model space to latent space)
5. From U to X.
(i.e. from the latent space to the preprocessed space)
6. From X to D.
(i.e. from the preprocessed space to the data space)
"""
if len(data.shape) == 1:
data = data[None, :]
if from_space == 'D' and to_space == 'X':
return transform_D_to_X(data, self.pca)
elif from_space == 'X' and to_space == 'U':
return transform_X_to_U(data, self.inv_A, self.m)
elif from_space == 'U' and to_space == 'U_model':
return transform_U_to_U_model(data, self.relevant_U_dims)
elif from_space == 'U_model' and to_space == 'U':
dim = self.get_dimensionality('U')
return transform_U_model_to_U(data, self.relevant_U_dims, dim)
elif from_space == 'U' and to_space == 'X':
return transform_U_to_X(data, self.A, self.m)
elif from_space == 'X' and to_space == 'D':
return transform_X_to_D(data, self.pca)
else:
transformed = data
for space_1, space_2 in get_space_walk(from_space, to_space):
transformed = self.transform(transformed, space_1, space_2)
return transformed
# Copyright 2017 Ravi Sojitra. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import numpy as np
from scipy.linalg import eigh
def optimize_maximum_likelihood(X, labels, n_relevant_dims_fraction):
""" Performs the optimization in Fig. 2 of p.537 of Ioffe 2006.
DESCRIPTION
- The main model parameters are `m`, `A`, and `Psi`.
- However, to improve the performance (speed and numerical stability)
of the plda.Model object,
inv_A and relevant_U_dims are also returned here.
ADDITIONAL NOTES
Be sure to test that np.cov(X.T) is full rank before running this.
Recall that there are 4 \"spaces\":
'D' (data) <---> 'X' (preprocessed) <---> 'U' (latent) <---> 'U_model'
ARGUMENTS
X (numpy.ndarray), shape=(n_data, n_dimensions)
- Data in statistics format, i.e. row-wise.
labels (list or numpy.ndarray), length=X.shape[0]
- Labels for the data in `X`.
- Must be sorted in the same order as `X`.
RETURNS
m (numpy.ndarray), shape=X.shape[-1]
- The mean of the row vectors in X.
- This is the prior mean fitted via maximum likelihood.
A (numpy.ndarray), shape=(X.shape[-1], X.shape[-1])
- Transformation from X space to the latent U space.
Psi (numpy.ndarray), shape=(X.shape[-1], X.shape[-1])
- The covariance matrix of the prior distribution on
the category means in U space.
relevant_U_dims (numpy.ndarray), shape=(len(np.unique(labels)) - 1,)
- The \"effective\" latent dimensions,
i.e. the ones that are actually used by the model.
inv_A (numpy.ndarray), shape=A.shape
- The inverse of the matrix A.
- Transformation from the latent U space to the X space.
"""
assert len(X.shape) == 2
assert X.shape[0] == len(labels)
m = calc_m(X)
S_b, S_w = calc_scatter_matrices(X, labels)
W = calc_W(S_b, S_w)
Lambda_b = calc_Lambda_b(S_b, W)
Lambda_w = calc_Lambda_w(S_w, W)
n_avg = calc_n_avg(labels)
A = calc_A(n_avg, Lambda_w, W)
inv_A = np.linalg.inv(A)
Psi = calc_Psi(Lambda_w, Lambda_b, n_avg)
relevant_U_dims = get_relevant_U_dims(Psi, n_relevant_dims_fraction)
return m, A, Psi, relevant_U_dims, inv_A
def as_dictionary_of_dictionaries(labels, means, cov_diags):
""" Dictionary storing one dictionary of parameters per category. """
assert len(labels) == len(means) == len(cov_diags)
all_params = dict()
for label, mean, cov_diag in zip(labels, means, cov_diags):
category_params = dict()
category_params['mean'] = mean
category_params['cov_diag'] = cov_diag
all_params[label] = category_params
return all_params
def calc_A(n_avg, Lambda_w, W):
""" See Fig. 2 on p.537 of Ioffe 2006. """
Lambda_w_diagonal = Lambda_w.diagonal() # Should be diagonal matrix.
inv_W_T = np.linalg.inv(W.T)
return inv_W_T * (n_avg / (n_avg - 1) * Lambda_w_diagonal) ** .5
def calc_Lambda_b(S_b, W):
""" See Fig. 2 on p.537 of Ioffe 2006. """
return np.matmul(np.matmul(W.T, S_b), W)
def calc_Lambda_w(S_w, W):
""" See Fig. 2 on p.537 of Ioffe 2006. """
return np.matmul(np.matmul(W.T, S_w), W)
def calc_m(X):
""" See Fig. 2 on p.537 of Ioffe 2006. """
assert len(X.shape) == 2
return X.mean(axis=0)
def calc_n_avg(Y):
""" This is the \"hack\" suggested in Fig 2 on p.537 of Ioffe 2006. """
unique = np.unique(Y)
return len(Y) / unique.shape[0]
def calc_Psi(Lambda_w, Lambda_b, n_avg):
""" See Fig. 2 on p.537 of Ioffe 2006. """
Lambda_w_diagonal = Lambda_w.diagonal() # Should be diagonal matrix.
Lambda_b_diagonal = Lambda_b.diagonal() # Should be diagonal matrix.
Psi = (n_avg - 1) / n_avg * Lambda_b_diagonal / Lambda_w_diagonal
Psi -= 1 / n_avg
Psi[Psi <= 0] = 0
return np.diag(Psi)
def calc_scatter_matrices(X, Y):
""" See Equations (1) on p.532 of Ioffe 2006. """
assert len(X.shape) == 2
assert X.shape[0] == len(Y)
unique_labels = np.unique(Y)
labels = np.asarray(Y)
m = calc_m(X)
N = X.shape[0]
cov_ks = []
m_ks = []
n_ks = []
for k in unique_labels:
bool_idxs = labels == k
X_k = X[bool_idxs]
m_ks.append(X_k.mean(axis=0))
n_ks.append(bool_idxs.sum())
cov_ks.append(np.cov(X_k.T))
n_ks = np.asarray(n_ks)
m_ks = np.asarray(m_ks)
m_ks_minus_m = m_ks - m
S_b = np.matmul(m_ks_minus_m.T * (n_ks / N), m_ks_minus_m)
S_w = np.asarray(cov_ks) * ((n_ks - 1) / N)[:, None, None]
S_w = np.sum(S_w, axis=0)
return S_b, S_w
def calc_W(S_b, S_w):
""" See Fig. 2 on p.537 of Ioffe 2006. """
eigenvalues, eigenvectors = eigh(S_b, S_w)
return eigenvectors
def get_posterior_params(U_model, Y, prior_params):
labels = np.asarray(Y)
prior_cov_diagonal = prior_params['cov_diag']
cov_diags = []
means = []
categories = []
for k in np.unique(labels):
bool_idxs = labels == k
U_model_k = U_model[bool_idxs]
n_k = bool_idxs.sum()
cov_diag = prior_cov_diagonal / (1 + n_k * prior_cov_diagonal)
mean = U_model_k.sum(axis=0) * cov_diag
cov_diags.append(cov_diag)
means.append(mean)
categories.append(k)
return as_dictionary_of_dictionaries(categories, means, cov_diags)
def get_posterior_predictive_params(posterior_params):
""" Likelihood covariance matrix is an Identity matrix. """
pp_params = posterior_params.copy()
for k, k_params in pp_params.items():
k_params['cov_diag'] += 1
return pp_params
def get_prior_params(Psi, dims):
""" See Equation (2) on p.533 of Ioffe 2006. """
cov_diag = Psi.diagonal()[dims]
mean = np.zeros(dims.shape)
return {'mean': mean, 'cov_diag': cov_diag}
def get_relevant_U_dims(Psi, n_relevant_dims_fraction):
""" See Fig. 2 on p.537 of Ioffe 2006. """
relevant_dims = np.squeeze(np.argwhere(Psi.diagonal() != 0))
if n_relevant_dims_fraction is not None:
n_dims_keep = int(n_relevant_dims_fraction * relevant_dims.shape[0])
relevant_dims = np.argsort(Psi.diagonal())[-n_dims_keep:]
if relevant_dims.shape == ():
relevant_dims = relevant_dims.reshape(1,)
return relevant_dims
import os
import numpy as np
from pyannote.database import get_protocol
from pyannote.metrics.binary_classification import det_curve
from pyannote.audio.features import Precomputed
from plda.model import PLDA
from collections import Counter
# import matplotlib.pyplot as plt
# from matplotlib.pyplot import figure
def evaluate(protocol, precomputed: Precomputed, plda: PLDA, partition: str) -> float:
y_true, y_pred = [], []
for trial in getattr(protocol, f'{partition}_trial')():
# Get embeddings from PLDA
emb1 = plda.transform(data=precomputed(trial['file1']).data, from_space='D', to_space='U_model')
emb2 = plda.transform(data=precomputed(trial['file2']).data, from_space='D', to_space='U_model')
# Calculate PLDA score between PLDA latent embeddings
score = plda.calc_same_diff_likelihood_ratio(emb1, emb2)
# Keep track of score and reference
y_pred.append(score)
y_true.append(trial['reference'])
_, _, _, eer = det_curve(np.array(y_true), np.array(y_pred), distances=False)
return eer
PRETRAINED = os.environ['PRETRAINED']
precomputed = Precomputed(f'{PRETRAINED}/apply/2134', use_memmap=False)
protocol = get_protocol('VoxCeleb.SpeakerVerification.VoxCeleb1_X')
open('dump.txt', 'w').close()
# Get precomputed embeddings
print('[Getting precomputed embeddings]')
embs, y = [], []
for train_file in protocol.train():
emb = precomputed(train_file).data.astype(np.float32)
label = int(train_file['annotation'].labels()[0][2:])
embs.append(emb)
y.append(np.zeros(emb.shape[0]) + label)
embs = np.concatenate(embs, axis=0)
y = np.concatenate(y, axis=0)
# print('Plotting segment class distribution')
c = Counter(y)
speakers = np.array(list(c.keys())).astype(int)
segment_counts = np.array(list(c.values()))
# figure(num=None, figsize=(10, 6), dpi=80, facecolor='w', edgecolor='k')
# plt.hist(segment_counts, alpha=0.5, density=False, color='blue', histtype='stepfilled')
# plt.title('Segment Class Distribution (VoxCeleb1_X.train)')
# plt.xlabel('Number of Segments')
# plt.ylabel('% of Speakers')
# plt.savefig("segment_class_distribution.png")
print('[Sampling 500 segments per speaker]')
sample_embs, sample_y = [], []
for speaker, n_segments in zip(speakers, segment_counts):
sampling_step = n_segments // 500
choices = embs[np.where(y == speaker)[0]][::sampling_step][:500]
sample_embs.append(choices)
sample_y.append(np.zeros(choices.shape[0]) + speaker)
sample_embs, sample_y = np.concatenate(sample_embs, axis=0), np.concatenate(sample_y, axis=0)
embs, y = sample_embs, sample_y
# Center embeddings
embs -= np.mean(embs, axis=0)
# Train PLDA tuning number of components using grid search
print('[Training PLDA]')
best_pca_ncomp, best_plda_ncomp, best_eer, best_plda = -1, -1, 100, None
for pca_ncomp in [None, 10, 20, 30, 40, 50, 80, 100, 120, 150, 200, 250, 300]:
for plda_ncomp in [0.2, 0.4, 0.5, 0.6, 0.8, 1.]:
print(f'[Fitting PLDA with {pca_ncomp} PCA components and {int(plda_ncomp * 100)}% of PLDA components]')
# Embedding length normalization is done between PCA and PLDA
plda = PLDA(embs, y, n_principal_components=pca_ncomp, n_relevant_dims_fraction=plda_ncomp)
plda_dims = len(plda.relevant_U_dims)
print(f'\tPLDA kept {plda_dims} components')
# Evaluate on Dev
print('\tEvaluating PLDA')
eer = evaluate(protocol, precomputed, plda, 'development')
print(f"\tDEV EER = {eer}")
with open('dump.txt', 'a') as file:
file.write(f'{pca_ncomp if pca_ncomp is not None else 0},{plda_ncomp},{plda_dims},{eer}\n')
if eer < best_eer:
best_eer, best_pca_ncomp, best_plda_ncomp, best_plda = eer, pca_ncomp, plda_ncomp, plda
print(f"Best Dev EER ({best_eer}) was achieved using {best_pca_ncomp} PCA components "
f"and {int(plda_ncomp * 100)}% of PLDA components")
# Evaluate on Test
print(f"Evaluating on Test using {best_pca_ncomp} PCA components and "
f"{int(best_plda_ncomp * 100)}% of PLDA components ({len(best_plda.relevant_U_dims)})")
eer = evaluate(protocol, precomputed, best_plda, 'test')
print(f"TEST EER = {eer}")
@juanmc2005
Copy link
Author

juanmc2005 commented Dec 19, 2019

TO DO

  • Verify class balance and fix if needed
  • Add vector length normalization
  • Add vector centering
  • S-norm scores ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment