Skip to content

Instantly share code, notes, and snippets.

@sharavsambuu
Forked from dkohlsdorf/triplet.py
Created December 23, 2019 11:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sharavsambuu/ee71c94c28f90da4813bbb1886812937 to your computer and use it in GitHub Desktop.
Save sharavsambuu/ee71c94c28f90da4813bbb1886812937 to your computer and use it in GitHub Desktop.
Triplet Loss Experiments For Audio Data with Tensorflow 2.0
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras.layers as layers
import tensorflow.keras.losses as loss
import tensorflow_datasets.public_api as tfds
from sys import argv
from random import Random
from os import walk
from scipy.fftpack import dct
from scipy.io import wavfile
from numpy.fft import rfft
def average_channels(audio):
assert audio.shape[1] == 2, "We can only average 2 channels {} given".format(audio.shape[1])
t = audio.shape[0]
return np.mean(audio, axis=1).reshape(t)
def ceps(window):
sample = np.abs(rfft(window))
mu = np.mean(sample)
std = max(np.std(sample), 1.0)
return (np.abs(rfft(window)) - mu) / std
def spectrogram(audio, win_size, step_size):
spec = []
win = np.hanning(win_size)
for t in range(win_size, len(audio), step_size):
sample = ceps(audio[t - win_size:t] * win)
spec.append(sample)
spec = np.stack(spec)
return spec
class AudioTripletsGenerator:
def __init__(self, feature_win, fft_win, fft_step, radius, seed, folder):
self.radius = radius
self.window_size = feature_win
self.rand = Random(seed)
self.specs = []
self.raw_audio = []
self.sample_rates = []
for current, dirs, files in walk(folder):
for filename in files:
if '.wav' in filename:
print("READING: {}/{}".format(current, filename))
fs, data = wavfile.read("{}/{}".format(current, filename))
if len(data.shape) == 2:
data = average_channels(data)
spec = spectrogram(data, fft_win, fft_step)
self.specs.append(spec)
self.raw_audio.append(data)
self.sample_rates.append(fs)
def sample(self, start, stop, exclude = None):
s = self.rand.randint(start, stop)
while s != None and s == exclude:
s = self.rand.randint(start, stop)
return s
def generate_triplet_ids(self):
n = len(self.specs)
assert n > 1, "We need more than one file to sample"
anchor_file = self.sample(0, n - 1)
negative_file = self.sample(0, n - 1, anchor_file)
anchor_len = self.specs[anchor_file].shape[0]
negative_len = self.specs[negative_file].shape[0]
anchor_sample = self.sample(self.window_size, anchor_len - self.window_size)
pos_sample = self.sample(
max(0, anchor_sample - self.radius),
min(anchor_sample + self.radius, anchor_len - self.window_size),
anchor_sample
)
neg_sample = self.sample(self.window_size, negative_len - self.window_size)
anchor_id = (anchor_file, anchor_sample)
pos_id = (anchor_file, pos_sample)
neg_id = (negative_file, neg_sample)
return anchor_id, pos_id, neg_id
def generate_triplet(self):
((ai, at), (_, pt), (ni, nt)) = self.generate_triplet_ids()
anchor = self.specs[ai][at : at + self.window_size]
pos = self.specs[ai][pt : pt + self.window_size]
neg = self.specs[ni][nt : nt + self.window_size]
(t, f) = anchor.shape
anchor = anchor.reshape((t, f, 1))
pos = pos.reshape((t, f, 1))
neg = neg.reshape((t, f, 1))
return anchor, pos, neg
def generate_triplets(self, n, win, freq):
anchor = np.zeros((n, win, freq, 1))
pos = np.zeros((n, win, freq, 1))
neg = np.zeros((n, win, freq, 1))
for i in range(n):
a, p, n = self.generate_triplet()
anchor[i, :, :] = a
pos[i, :, :] = p
neg[i, :, :] = n
return anchor, pos, neg
class TripletLoss(loss.Loss):
def __init__(self, margin):
super().__init__()
self.margin = margin
def call(self, y_true, y_pred):
anchor = y_pred[0]
pos = y_pred[1]
neg = y_pred[2]
pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, pos)), axis=-1)
neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, neg)), axis=-1)
basic_loss = tf.add(tf.subtract(pos_dist, neg_dist), self.margin)
loss = tf.reduce_sum(tf.maximum(basic_loss, 0.0))
return loss
def embedder(input_size, n_filters, kernel_size, out):
inp = layers.Input(input_size, name="spectrogram_snippet")
x = layers.Conv2D(n_filters, kernel_size, activation="relu", padding="same", name = "convolution")(inp)
x = layers.MaxPooling2D((1, 2), name = "pool_freq")(x)
(_, t, f, d) = x.shape
x = layers.Reshape((t, f * d), name = "flatten2seq")(x)
out = layers.GRU(out, return_sequences=False, name = "seq2one")(x)
return tf.keras.models.Model(inputs=[inp], outputs=[out])
def triplets(input_size, n_filters, kernel_size, out, margin):
embed_net = embedder(input_size, n_filters, kernel_size, out)
anchor_in = layers.Input(input_size, name = "anchor")
anchor_out = embed_net(anchor_in)
pos_in = layers.Input(input_size, name = "pos")
pos_out = embed_net(pos_in)
neg_in = layers.Input(input_size, name = "neg")
neg_out = embed_net(neg_in)
model = tf.keras.models.Model(inputs=[anchor_in, pos_in, neg_in], outputs=[anchor_out, pos_out, neg_out])
triplet_loss = TripletLoss(margin)
model.compile(optimizer = 'adam', loss = triplet_loss)
return (embed_net, model)
def spectrogram_windows(raw_audio, fft_win, step_size, feature_win, feature_step):
spec = spectrogram(raw_audio, fft_win, step_size)
(n, feature_out) = spec.shape
embedding = np.zeros((int(n / feature_step), feature_win, feature_out, 1))
window = 0
for t in range(0, n - feature_win, feature_step):
embedding[window,:,:,:] = spec[t:t + feature_win].reshape((feature_win, feature_out, 1))
window += 1
return embedding
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment