Skip to content

Instantly share code, notes, and snippets.

@dkohlsdorf
Last active July 15, 2019 21:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dkohlsdorf/758af0e89118fd8625dbb3aa00630032 to your computer and use it in GitHub Desktop.
Save dkohlsdorf/758af0e89118fd8625dbb3aa00630032 to your computer and use it in GitHub Desktop.
Deleting Whistles From Clicks
import matplotlib.pyplot as plt
import numpy as np
import warnings
import os
from collections import namedtuple
from numpy.fft import fft, ifft
from scipy.io import wavfile
from scipy.ndimage import gaussian_filter
warnings.filterwarnings('ignore')
def fwd_spectrogram(audio, win, step):
spectrogram = []
hanning = np.hanning(win)
for i in range(win, len(audio), step):
dft = fft(audio[i - win: i])
spectrogram.append(dft)
return np.array(spectrogram)
def bwd_spectrogram(spectrogram, win, step):
t = len(spectrogram) * step + win
audio = np.zeros(t)
scaler = np.zeros(t)
for (i, sample) in enumerate(spectrogram):
window = ifft(sample)
audio[i * step: (i * step) + win] += window.real
scaler[i * step: (i * step) + win] += 1
return audio / scaler
def argmax(seq):
max_val = float('-inf')
max_idx = 0
for i, v in enumerate(seq):
if v > max_val:
max_val = v
max_idx = i
return max_idx, max_val
def entropy(p):
return -np.sum(p * np.log(p + 1e-8))
def trace(whistle):
w, h = whistle.shape
h = int(h/2)
whistle = np.abs(whistle[:, 0:h])
whistle /= np.max(whistle)
dp = np.ones((w, h)) * float('-inf')
bp = np.zeros((w,h), dtype=np.int)
for f in range(h):
dp[0, f] = whistle[0, f]
for t in range(1, w):
if t % 100 == 0:
print("\t\t Time: {} Percentage Done: {}".format(t, t/ w))
for f in range(0, h):
start = int(max(f - 10, 0))
stop = int(min(f + 10 , h))
bp[t,f], dp[t, f] = argmax([
dp[t - 1, prev] + np.log(whistle[t - 1, prev]) for prev in range(start, stop)
])
bp[t, f] += start
dp[t, f] += np.log(whistle[t, f])
path = np.zeros(w, dtype=np.int)
scores = np.zeros(w)
(path[w - 1], scores[w - 1]) = argmax([dp[w - 1, f] for f in range(0, h)])
i = w - 2
while i > 0:
path[i] = bp[i, path[i + 1]]
scores[i] = entropy(whistle[i, :])
i -= 1
scores = np.convolve(scores, np.ones((64,))/64, mode='full')
threshold = np.percentile(scores[32:-32], 50) # Change value if needed
for i in range(32, len(scores) - 33):
if scores[i] <= threshold:
path[i - 32] = 0.0
return path, scores[32:-32]
def mask(path, w, h, margin, harmonics, shift_up):
mask = np.ones((w, h))
n = len(path)
for t in range(0, n):
if path[t] > 0.0:
for harmonic in range(1, harmonics + 1):
start = int(max(path[t] * harmonic - margin + shift_up, 0))
stop = int(min(path[t] * harmonic + margin + shift_up, h / 2))
for f in range(start, stop):
mask[t][f] = 0.0
mask[t][h - f] = 0.0
return mask
def revert_value(x, maximum):
if x > 0:
return (maximum - x)
else:
return maximum
def mk_click(trace, click, target_len, speedup):
n = len(click)
trace = trace * speedup
trace = np.array([revert_value(i, 512) for i in trace])
steps = ((1.0 / np.dot(trace, trace)) * (trace)) * target_len # think linear regression
click_train = np.zeros(target_len)
scaler = np.ones(target_len)
next_t = 0
for i in range(0, len(trace)):
if next_t + n < target_len:
click_train[next_t:next_t + n] += click
scaler[next_t:next_t+n] += 1
next_t += int(steps[i] * trace[i])
return click_train / scaler, steps
def generate_example(one_click, data_whistle, outfile, fs, fft_win = 1024, fft_step = 512, n_bands = 4, margin = 5, shift_up = 20, speedup = 5, debug = False):
print("1) Compute Spectrograms")
spec_whistle = fwd_spectrogram(data_whistle[:, 0], fft_win, fft_step)
print("2) Trace Whistle And Create Mask")
whistle_trace, whistle_scores = trace(spec_whistle)
data_click, delay_profile = mk_click(whistle_trace, one_click[:, 0], data_whistle[:, 0].shape[0], speedup)
spec_click = fwd_spectrogram(data_click, fft_win, fft_step)
whistle_mask = mask(whistle_trace, spec_click.shape[0], fft_win, margin, n_bands, shift_up)
print(spec_click.shape, whistle_mask.shape)
deleted = bwd_spectrogram(spec_click * whistle_mask, fft_win, fft_step)
if debug:
print("2.1) Debug")
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k')
plt.plot(whistle_trace)
plt.title("Whistle Trace")
plt.show()
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k')
plt.plot(delay_profile)
plt.title("Delay Profile")
plt.show()
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k')
plt.plot(whistle_scores)
plt.title("Whistle Entropy Scoring")
plt.show()
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k')
plt.imshow(whistle_mask.T)
plt.title("Delete Mask")
plt.show()
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k')
plt.imshow(np.abs(spec_click).T * whistle_mask.T)
plt.title("Spectrogram Deleted")
plt.show()
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k')
plt.imshow(np.abs(fwd_spectrogram(deleted, fft_win, fft_step).T))
plt.title("Spectrogram Of Deleted Audio")
plt.show()
print("3) Write Result")
wavfile.write(outfile, fs, deleted * 0.001)
print("Done")
fs, data_click = wavfile.read('one_click.wav')
for filename in os.listdir("whistles/"):
if filename.endswith(".wav"):
print("Processing: {}".format(filename))
basename = filename.replace(".wav", "")
infile = "whistles/{}.wav".format(basename)
outfile = "output/{}.wav".format(basename)
fs, data_whistle = wavfile.read(infile)
generate_example(data_click, data_whistle, outfile, fs, n_bands = 8, margin = 5, shift_up = 5, speedup = 0.5, debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment