Skip to content

Instantly share code, notes, and snippets.

@dkohlsdorf
Last active July 1, 2021 22:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save dkohlsdorf/90df4721cd70c6f4420b7e049796280b to your computer and use it in GitHub Desktop.
Save dkohlsdorf/90df4721cd70c6f4420b7e049796280b to your computer and use it in GitHub Desktop.
Triplet Loss Experiments For Audio Data with Tensorflow 2.0
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras.layers as layers
import tensorflow.keras.losses as loss
import tensorflow_datasets.public_api as tfds
from sys import argv
from random import Random
from os import walk
from scipy.fftpack import dct
from scipy.io import wavfile
from numpy.fft import rfft
def average_channels(audio):
assert audio.shape[1] == 2, "We can only average 2 channels {} given".format(audio.shape[1])
t = audio.shape[0]
return np.mean(audio, axis=1).reshape(t)
def ceps(window):
sample = np.abs(rfft(window))
mu = np.mean(sample)
std = max(np.std(sample), 1.0)
return (np.abs(rfft(window)) - mu) / std
def spectrogram(audio, win_size, step_size):
spec = []
win = np.hanning(win_size)
for t in range(win_size, len(audio), step_size):
sample = ceps(audio[t - win_size:t] * win)
spec.append(sample)
spec = np.stack(spec)
return spec
class AudioTripletsGenerator:
def __init__(self, feature_win, fft_win, fft_step, radius, seed, folder):
self.radius = radius
self.window_size = feature_win
self.rand = Random(seed)
self.specs = []
self.raw_audio = []
self.sample_rates = []
for current, dirs, files in walk(folder):
for filename in files:
if '.wav' in filename:
print("READING: {}/{}".format(current, filename))
fs, data = wavfile.read("{}/{}".format(current, filename))
if len(data.shape) == 2:
data = average_channels(data)
spec = spectrogram(data, fft_win, fft_step)
self.specs.append(spec)
self.raw_audio.append(data)
self.sample_rates.append(fs)
def sample(self, start, stop, exclude = None):
s = self.rand.randint(start, stop)
while s != None and s == exclude:
s = self.rand.randint(start, stop)
return s
def generate_triplet_ids(self):
n = len(self.specs)
assert n > 1, "We need more than one file to sample"
anchor_file = self.sample(0, n - 1)
negative_file = self.sample(0, n - 1, anchor_file)
anchor_len = self.specs[anchor_file].shape[0]
negative_len = self.specs[negative_file].shape[0]
anchor_sample = self.sample(self.window_size, anchor_len - self.window_size)
pos_sample = self.sample(
max(0, anchor_sample - self.radius),
min(anchor_sample + self.radius, anchor_len - self.window_size),
anchor_sample
)
neg_sample = self.sample(self.window_size, negative_len - self.window_size)
anchor_id = (anchor_file, anchor_sample)
pos_id = (anchor_file, pos_sample)
neg_id = (negative_file, neg_sample)
return anchor_id, pos_id, neg_id
def generate_triplet(self):
((ai, at), (_, pt), (ni, nt)) = self.generate_triplet_ids()
anchor = self.specs[ai][at : at + self.window_size]
pos = self.specs[ai][pt : pt + self.window_size]
neg = self.specs[ni][nt : nt + self.window_size]
(t, f) = anchor.shape
anchor = anchor.reshape((t, f, 1))
pos = pos.reshape((t, f, 1))
neg = neg.reshape((t, f, 1))
return anchor, pos, neg
def generate_triplets(self, n, win, freq):
anchor = np.zeros((n, win, freq, 1))
pos = np.zeros((n, win, freq, 1))
neg = np.zeros((n, win, freq, 1))
for i in range(n):
a, p, n = self.generate_triplet()
anchor[i, :, :] = a
pos[i, :, :] = p
neg[i, :, :] = n
return anchor, pos, neg
class TripletLoss(loss.Loss):
def __init__(self, margin):
super().__init__()
self.margin = margin
def call(self, y_true, y_pred):
anchor = y_pred[0]
pos = y_pred[1]
neg = y_pred[2]
pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, pos)), axis=-1)
neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, neg)), axis=-1)
basic_loss = tf.add(tf.subtract(pos_dist, neg_dist), self.margin)
loss = tf.reduce_sum(tf.maximum(basic_loss, 0.0))
return loss
def embedder(input_size, n_filters, kernel_size, out):
inp = layers.Input(input_size, name="spectrogram_snippet")
x = layers.Conv2D(n_filters, kernel_size, activation="relu", padding="same", name = "convolution")(inp)
x = layers.MaxPooling2D((1, 2), name = "pool_freq")(x)
(_, t, f, d) = x.shape
x = layers.Reshape((t, f * d), name = "flatten2seq")(x)
out = layers.GRU(out, return_sequences=False, name = "seq2one")(x)
return tf.keras.models.Model(inputs=[inp], outputs=[out])
def triplets(input_size, n_filters, kernel_size, out, margin):
embed_net = embedder(input_size, n_filters, kernel_size, out)
anchor_in = layers.Input(input_size, name = "anchor")
anchor_out = embed_net(anchor_in)
pos_in = layers.Input(input_size, name = "pos")
pos_out = embed_net(pos_in)
neg_in = layers.Input(input_size, name = "neg")
neg_out = embed_net(neg_in)
model = tf.keras.models.Model(inputs=[anchor_in, pos_in, neg_in], outputs=[anchor_out, pos_out, neg_out])
triplet_loss = TripletLoss(margin)
model.compile(optimizer = 'adam', loss = triplet_loss)
return (embed_net, model)
def spectrogram_windows(raw_audio, fft_win, step_size, feature_win, feature_step):
spec = spectrogram(raw_audio, fft_win, step_size)
(n, feature_out) = spec.shape
embedding = np.zeros((int(n / feature_step), feature_win, feature_out, 1))
window = 0
for t in range(0, n - feature_win, feature_step):
embedding[window,:,:,:] = spec[t:t + feature_win].reshape((feature_win, feature_out, 1))
window += 1
return embedding
@eoehri
Copy link

eoehri commented Dec 21, 2019

Hi @dkohlsdorf, could you provide an example how to use that code? Many thanks in advance!

@sunyikang
Copy link

Same ask.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment