-
-
Save sharavsambuu/ee71c94c28f90da4813bbb1886812937 to your computer and use it in GitHub Desktop.
Triplet Loss Experiments For Audio Data with Tensorflow 2.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import matplotlib.pyplot as plt | |
import tensorflow as tf | |
import tensorflow.keras.layers as layers | |
import tensorflow.keras.losses as loss | |
import tensorflow_datasets.public_api as tfds | |
from sys import argv | |
from random import Random | |
from os import walk | |
from scipy.fftpack import dct | |
from scipy.io import wavfile | |
from numpy.fft import rfft | |
def average_channels(audio): | |
assert audio.shape[1] == 2, "We can only average 2 channels {} given".format(audio.shape[1]) | |
t = audio.shape[0] | |
return np.mean(audio, axis=1).reshape(t) | |
def ceps(window): | |
sample = np.abs(rfft(window)) | |
mu = np.mean(sample) | |
std = max(np.std(sample), 1.0) | |
return (np.abs(rfft(window)) - mu) / std | |
def spectrogram(audio, win_size, step_size): | |
spec = [] | |
win = np.hanning(win_size) | |
for t in range(win_size, len(audio), step_size): | |
sample = ceps(audio[t - win_size:t] * win) | |
spec.append(sample) | |
spec = np.stack(spec) | |
return spec | |
class AudioTripletsGenerator: | |
def __init__(self, feature_win, fft_win, fft_step, radius, seed, folder): | |
self.radius = radius | |
self.window_size = feature_win | |
self.rand = Random(seed) | |
self.specs = [] | |
self.raw_audio = [] | |
self.sample_rates = [] | |
for current, dirs, files in walk(folder): | |
for filename in files: | |
if '.wav' in filename: | |
print("READING: {}/{}".format(current, filename)) | |
fs, data = wavfile.read("{}/{}".format(current, filename)) | |
if len(data.shape) == 2: | |
data = average_channels(data) | |
spec = spectrogram(data, fft_win, fft_step) | |
self.specs.append(spec) | |
self.raw_audio.append(data) | |
self.sample_rates.append(fs) | |
def sample(self, start, stop, exclude = None): | |
s = self.rand.randint(start, stop) | |
while s != None and s == exclude: | |
s = self.rand.randint(start, stop) | |
return s | |
def generate_triplet_ids(self): | |
n = len(self.specs) | |
assert n > 1, "We need more than one file to sample" | |
anchor_file = self.sample(0, n - 1) | |
negative_file = self.sample(0, n - 1, anchor_file) | |
anchor_len = self.specs[anchor_file].shape[0] | |
negative_len = self.specs[negative_file].shape[0] | |
anchor_sample = self.sample(self.window_size, anchor_len - self.window_size) | |
pos_sample = self.sample( | |
max(0, anchor_sample - self.radius), | |
min(anchor_sample + self.radius, anchor_len - self.window_size), | |
anchor_sample | |
) | |
neg_sample = self.sample(self.window_size, negative_len - self.window_size) | |
anchor_id = (anchor_file, anchor_sample) | |
pos_id = (anchor_file, pos_sample) | |
neg_id = (negative_file, neg_sample) | |
return anchor_id, pos_id, neg_id | |
def generate_triplet(self): | |
((ai, at), (_, pt), (ni, nt)) = self.generate_triplet_ids() | |
anchor = self.specs[ai][at : at + self.window_size] | |
pos = self.specs[ai][pt : pt + self.window_size] | |
neg = self.specs[ni][nt : nt + self.window_size] | |
(t, f) = anchor.shape | |
anchor = anchor.reshape((t, f, 1)) | |
pos = pos.reshape((t, f, 1)) | |
neg = neg.reshape((t, f, 1)) | |
return anchor, pos, neg | |
def generate_triplets(self, n, win, freq): | |
anchor = np.zeros((n, win, freq, 1)) | |
pos = np.zeros((n, win, freq, 1)) | |
neg = np.zeros((n, win, freq, 1)) | |
for i in range(n): | |
a, p, n = self.generate_triplet() | |
anchor[i, :, :] = a | |
pos[i, :, :] = p | |
neg[i, :, :] = n | |
return anchor, pos, neg | |
class TripletLoss(loss.Loss): | |
def __init__(self, margin): | |
super().__init__() | |
self.margin = margin | |
def call(self, y_true, y_pred): | |
anchor = y_pred[0] | |
pos = y_pred[1] | |
neg = y_pred[2] | |
pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, pos)), axis=-1) | |
neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, neg)), axis=-1) | |
basic_loss = tf.add(tf.subtract(pos_dist, neg_dist), self.margin) | |
loss = tf.reduce_sum(tf.maximum(basic_loss, 0.0)) | |
return loss | |
def embedder(input_size, n_filters, kernel_size, out): | |
inp = layers.Input(input_size, name="spectrogram_snippet") | |
x = layers.Conv2D(n_filters, kernel_size, activation="relu", padding="same", name = "convolution")(inp) | |
x = layers.MaxPooling2D((1, 2), name = "pool_freq")(x) | |
(_, t, f, d) = x.shape | |
x = layers.Reshape((t, f * d), name = "flatten2seq")(x) | |
out = layers.GRU(out, return_sequences=False, name = "seq2one")(x) | |
return tf.keras.models.Model(inputs=[inp], outputs=[out]) | |
def triplets(input_size, n_filters, kernel_size, out, margin): | |
embed_net = embedder(input_size, n_filters, kernel_size, out) | |
anchor_in = layers.Input(input_size, name = "anchor") | |
anchor_out = embed_net(anchor_in) | |
pos_in = layers.Input(input_size, name = "pos") | |
pos_out = embed_net(pos_in) | |
neg_in = layers.Input(input_size, name = "neg") | |
neg_out = embed_net(neg_in) | |
model = tf.keras.models.Model(inputs=[anchor_in, pos_in, neg_in], outputs=[anchor_out, pos_out, neg_out]) | |
triplet_loss = TripletLoss(margin) | |
model.compile(optimizer = 'adam', loss = triplet_loss) | |
return (embed_net, model) | |
def spectrogram_windows(raw_audio, fft_win, step_size, feature_win, feature_step): | |
spec = spectrogram(raw_audio, fft_win, step_size) | |
(n, feature_out) = spec.shape | |
embedding = np.zeros((int(n / feature_step), feature_win, feature_out, 1)) | |
window = 0 | |
for t in range(0, n - feature_win, feature_step): | |
embedding[window,:,:,:] = spec[t:t + feature_win].reshape((feature_win, feature_out, 1)) | |
window += 1 | |
return embedding |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment