Last active
July 15, 2019 21:01
-
-
Save dkohlsdorf/758af0e89118fd8625dbb3aa00630032 to your computer and use it in GitHub Desktop.
Deleting Whistles From Clicks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import matplotlib.pyplot as plt | |
import numpy as np | |
import warnings | |
import os | |
from collections import namedtuple | |
from numpy.fft import fft, ifft | |
from scipy.io import wavfile | |
from scipy.ndimage import gaussian_filter | |
warnings.filterwarnings('ignore') | |
def fwd_spectrogram(audio, win, step): | |
spectrogram = [] | |
hanning = np.hanning(win) | |
for i in range(win, len(audio), step): | |
dft = fft(audio[i - win: i]) | |
spectrogram.append(dft) | |
return np.array(spectrogram) | |
def bwd_spectrogram(spectrogram, win, step): | |
t = len(spectrogram) * step + win | |
audio = np.zeros(t) | |
scaler = np.zeros(t) | |
for (i, sample) in enumerate(spectrogram): | |
window = ifft(sample) | |
audio[i * step: (i * step) + win] += window.real | |
scaler[i * step: (i * step) + win] += 1 | |
return audio / scaler | |
def argmax(seq): | |
max_val = float('-inf') | |
max_idx = 0 | |
for i, v in enumerate(seq): | |
if v > max_val: | |
max_val = v | |
max_idx = i | |
return max_idx, max_val | |
def entropy(p): | |
return -np.sum(p * np.log(p + 1e-8)) | |
def trace(whistle): | |
w, h = whistle.shape | |
h = int(h/2) | |
whistle = np.abs(whistle[:, 0:h]) | |
whistle /= np.max(whistle) | |
dp = np.ones((w, h)) * float('-inf') | |
bp = np.zeros((w,h), dtype=np.int) | |
for f in range(h): | |
dp[0, f] = whistle[0, f] | |
for t in range(1, w): | |
if t % 100 == 0: | |
print("\t\t Time: {} Percentage Done: {}".format(t, t/ w)) | |
for f in range(0, h): | |
start = int(max(f - 10, 0)) | |
stop = int(min(f + 10 , h)) | |
bp[t,f], dp[t, f] = argmax([ | |
dp[t - 1, prev] + np.log(whistle[t - 1, prev]) for prev in range(start, stop) | |
]) | |
bp[t, f] += start | |
dp[t, f] += np.log(whistle[t, f]) | |
path = np.zeros(w, dtype=np.int) | |
scores = np.zeros(w) | |
(path[w - 1], scores[w - 1]) = argmax([dp[w - 1, f] for f in range(0, h)]) | |
i = w - 2 | |
while i > 0: | |
path[i] = bp[i, path[i + 1]] | |
scores[i] = entropy(whistle[i, :]) | |
i -= 1 | |
scores = np.convolve(scores, np.ones((64,))/64, mode='full') | |
threshold = np.percentile(scores[32:-32], 50) # Change value if needed | |
for i in range(32, len(scores) - 33): | |
if scores[i] <= threshold: | |
path[i - 32] = 0.0 | |
return path, scores[32:-32] | |
def mask(path, w, h, margin, harmonics, shift_up): | |
mask = np.ones((w, h)) | |
n = len(path) | |
for t in range(0, n): | |
if path[t] > 0.0: | |
for harmonic in range(1, harmonics + 1): | |
start = int(max(path[t] * harmonic - margin + shift_up, 0)) | |
stop = int(min(path[t] * harmonic + margin + shift_up, h / 2)) | |
for f in range(start, stop): | |
mask[t][f] = 0.0 | |
mask[t][h - f] = 0.0 | |
return mask | |
def revert_value(x, maximum): | |
if x > 0: | |
return (maximum - x) | |
else: | |
return maximum | |
def mk_click(trace, click, target_len, speedup): | |
n = len(click) | |
trace = trace * speedup | |
trace = np.array([revert_value(i, 512) for i in trace]) | |
steps = ((1.0 / np.dot(trace, trace)) * (trace)) * target_len # think linear regression | |
click_train = np.zeros(target_len) | |
scaler = np.ones(target_len) | |
next_t = 0 | |
for i in range(0, len(trace)): | |
if next_t + n < target_len: | |
click_train[next_t:next_t + n] += click | |
scaler[next_t:next_t+n] += 1 | |
next_t += int(steps[i] * trace[i]) | |
return click_train / scaler, steps | |
def generate_example(one_click, data_whistle, outfile, fs, fft_win = 1024, fft_step = 512, n_bands = 4, margin = 5, shift_up = 20, speedup = 5, debug = False): | |
print("1) Compute Spectrograms") | |
spec_whistle = fwd_spectrogram(data_whistle[:, 0], fft_win, fft_step) | |
print("2) Trace Whistle And Create Mask") | |
whistle_trace, whistle_scores = trace(spec_whistle) | |
data_click, delay_profile = mk_click(whistle_trace, one_click[:, 0], data_whistle[:, 0].shape[0], speedup) | |
spec_click = fwd_spectrogram(data_click, fft_win, fft_step) | |
whistle_mask = mask(whistle_trace, spec_click.shape[0], fft_win, margin, n_bands, shift_up) | |
print(spec_click.shape, whistle_mask.shape) | |
deleted = bwd_spectrogram(spec_click * whistle_mask, fft_win, fft_step) | |
if debug: | |
print("2.1) Debug") | |
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k') | |
plt.plot(whistle_trace) | |
plt.title("Whistle Trace") | |
plt.show() | |
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k') | |
plt.plot(delay_profile) | |
plt.title("Delay Profile") | |
plt.show() | |
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k') | |
plt.plot(whistle_scores) | |
plt.title("Whistle Entropy Scoring") | |
plt.show() | |
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k') | |
plt.imshow(whistle_mask.T) | |
plt.title("Delete Mask") | |
plt.show() | |
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k') | |
plt.imshow(np.abs(spec_click).T * whistle_mask.T) | |
plt.title("Spectrogram Deleted") | |
plt.show() | |
fig=plt.figure(figsize=(10, 8), dpi= 80, facecolor='w', edgecolor='k') | |
plt.imshow(np.abs(fwd_spectrogram(deleted, fft_win, fft_step).T)) | |
plt.title("Spectrogram Of Deleted Audio") | |
plt.show() | |
print("3) Write Result") | |
wavfile.write(outfile, fs, deleted * 0.001) | |
print("Done") | |
fs, data_click = wavfile.read('one_click.wav') | |
for filename in os.listdir("whistles/"): | |
if filename.endswith(".wav"): | |
print("Processing: {}".format(filename)) | |
basename = filename.replace(".wav", "") | |
infile = "whistles/{}.wav".format(basename) | |
outfile = "output/{}.wav".format(basename) | |
fs, data_whistle = wavfile.read(infile) | |
generate_example(data_click, data_whistle, outfile, fs, n_bands = 8, margin = 5, shift_up = 5, speedup = 0.5, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment