Created
December 6, 2023 17:57
-
-
Save lambdaloop/e2c50f9bfbadcae8407d9c1a89b9052b to your computer and use it in GitHub Desktop.
Functions to etimate the offset between two videos using audio, broken down by chunks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import scipy.signal as signal | |
import matplotlib.pyplot as plt | |
import subprocess | |
import scipy.io | |
import io | |
import soundfile as sf | |
from pyts.metrics import dtw | |
def get_audio_data(video_path, sample_rate): | |
result = subprocess.run(['ffmpeg', '-i', video_path, '-vn', '-ac', '1', '-f', 'wav', | |
'-ar', str(sample_rate), '-'], capture_output=True) | |
audio_data, fs = sf.read(io.BytesIO(result.stdout)) | |
return audio_data | |
def get_frame_rate(video_path): | |
cmd = ['ffprobe', '-v', '0', '-of', 'csv=p=0', '-select_streams', 'v:0', '-show_entries', 'stream=avg_frame_rate', video_path] | |
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
output, _ = process.communicate() | |
frame_rate_str = output.decode('utf-8').strip() | |
num, denom = frame_rate_str.split('/') | |
frame_rate = float(num) / float(denom) | |
return frame_rate | |
def get_audio_offset_simple(audio1, audio2, sample_rate, max_offset=None): | |
corr = signal.fftconvolve(audio1[::-1], audio2, mode='full') | |
time_shift = np.arange(-len(audio1) + 1, len(audio2)) / sample_rate | |
valid = np.ones(len(time_shift), dtype='bool') | |
if max_offset is not None: | |
valid[np.abs(time_shift) > max_offset] = False | |
ix = np.argmax(corr[valid]) | |
offset = time_shift[valid][ix] | |
ratio = np.max(corr) / np.median(np.abs(corr)) | |
return offset, ratio | |
def get_audio_offset(audio1, audio2, sample_rate): | |
corr = signal.fftconvolve(audio1[::-1], audio2, mode='full') | |
ccs = signal.fftconvolve(np.abs(corr), np.ones(sample_rate // 8), mode='same') | |
ix = np.argmax(ccs) | |
ix_new = np.argmax(corr[ix - sample_rate:ix+sample_rate]) + ix - sample_rate | |
time_shift = np.arange(-len(audio1) + 1, len(audio2)) | |
offset = time_shift[ix_new] / sample_rate | |
return offset | |
def get_specgram_offset(signal1, signal2, sample_rate, nperseg=8192, freq_limit=None, **args): | |
f, t1, stft1 = signal.spectrogram(signal1, sample_rate, nperseg=nperseg, **args) | |
f, t2, stft2 = signal.spectrogram(signal2, sample_rate, nperseg=nperseg, **args) | |
if freq_limit is None: | |
freq_limit = np.inf | |
power1 = stft1[f < freq_limit] | |
power2 = stft2[f < freq_limit] | |
x = np.zeros(power1.shape[1] + power2.shape[1] - 1) | |
for i in range(power1.shape[0]): | |
c = signal.fftconvolve(power1[i, ::-1], power2[i], mode='full') | |
x += c / np.abs(np.max(c)) | |
corr = x / power1.shape[0] | |
time_shift = np.arange(-len(t1) + 1, len(t2)) | |
ix = np.argmax(corr) | |
offset = time_shift[ix] | |
if offset >= 0: | |
return t2[offset] | |
else: | |
return -1 * t1[abs(offset)] | |
def get_multiscale_specgram_offset(signal1, signal2, sample_rate, freq_limit=8000): | |
t_offset = get_specgram_offset(signal1, signal2, sample_rate, nperseg=16384, noverlap=0, | |
freq_limit=freq_limit) | |
clip = int(round(abs(t_offset) * sample_rate)) | |
start = sample_rate * 15 | |
end = start + sample_rate * 120 | |
if t_offset < 0: | |
crop1 = signal1[clip+start : clip+end] | |
crop2 = signal2[start : end] | |
else: | |
crop1 = signal1[start : end] | |
crop2 = signal2[clip+start : clip+end] | |
# t_offset_add = get_specgram_offset(crop1, crop2, sample_rate, nperseg=256, | |
# noverlap=None, freq_limit=8000) | |
t_offset_add = get_audio_offset(crop1, crop2, sample_rate) | |
return t_offset + t_offset_add | |
video1_path = '/home/lili/data/dancing/disco/compressed/2023-04-14/gopro1.mkv' | |
video2_path = '/home/lili/data/dancing/disco/compressed/2023-04-14/gopro3.mkv' | |
sample_rate = 44000 | |
audio1 = get_audio_data(video1_path, sample_rate) | |
audio2 = get_audio_data(video2_path, sample_rate) | |
signal1 = audio1 | |
signal2 = audio2 | |
t_offset = get_multiscale_specgram_offset(audio1, audio2, sample_rate) | |
clip = int(round(abs(t_offset) * sample_rate)) | |
if t_offset < 0: | |
crop1 = signal1[clip:] | |
crop2 = signal2 | |
else: | |
crop1 = signal1 | |
crop2 = signal2[clip:] | |
m = min(len(crop1), len(crop2)) | |
crop1 = crop1[:m] | |
crop2 = crop2[:m] | |
sos = signal.butter(3, 500, btype='lowpass', | |
fs=sample_rate, output='sos') | |
crop1f = signal.sosfiltfilt(sos, crop1) | |
crop2f = signal.sosfiltfilt(sos, crop2) | |
from tqdm import trange | |
interval = sample_rate * 30 | |
n_chunks = int(len(crop1)/interval) | |
offsets = np.zeros(n_chunks) | |
ratios = np.zeros(n_chunks) | |
for i in trange(n_chunks, ncols=70): | |
start = interval * i | |
end = start + sample_rate*30 | |
c1 = crop1f[start:end] | |
c2 = crop2f[start:end] | |
t, ratio = get_audio_offset_simple(c1, c2, sample_rate) | |
# t = get_audio_offset(c1, c2, sample_rate) | |
# t = get_specgram_offset(c1, c2, sample_rate, nperseg=256, freq_limit=7000) | |
offsets[i] = t | |
ratios[i] = ratio | |
minutes = np.arange(len(offsets)) * (interval / sample_rate) / 60.0 | |
plt.figure(1) | |
plt.clf() | |
plt.plot(minutes, offsets) | |
# plt.plot(ratios / 500) | |
plt.ylabel('Offset (s)') | |
plt.xlabel('Time (min)') | |
plt.title("Offest between GoPro 1 and GoPro 2") | |
plt.draw() | |
plt.show(block=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment