Skip to content

Instantly share code, notes, and snippets.

@lambdaloop
Created December 6, 2023 17:57
Show Gist options
  • Save lambdaloop/e2c50f9bfbadcae8407d9c1a89b9052b to your computer and use it in GitHub Desktop.
Save lambdaloop/e2c50f9bfbadcae8407d9c1a89b9052b to your computer and use it in GitHub Desktop.
Functions to etimate the offset between two videos using audio, broken down by chunks
import numpy as np
import scipy.signal as signal
import matplotlib.pyplot as plt
import subprocess
import scipy.io
import io
import soundfile as sf
from pyts.metrics import dtw
def get_audio_data(video_path, sample_rate):
result = subprocess.run(['ffmpeg', '-i', video_path, '-vn', '-ac', '1', '-f', 'wav',
'-ar', str(sample_rate), '-'], capture_output=True)
audio_data, fs = sf.read(io.BytesIO(result.stdout))
return audio_data
def get_frame_rate(video_path):
cmd = ['ffprobe', '-v', '0', '-of', 'csv=p=0', '-select_streams', 'v:0', '-show_entries', 'stream=avg_frame_rate', video_path]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output, _ = process.communicate()
frame_rate_str = output.decode('utf-8').strip()
num, denom = frame_rate_str.split('/')
frame_rate = float(num) / float(denom)
return frame_rate
def get_audio_offset_simple(audio1, audio2, sample_rate, max_offset=None):
corr = signal.fftconvolve(audio1[::-1], audio2, mode='full')
time_shift = np.arange(-len(audio1) + 1, len(audio2)) / sample_rate
valid = np.ones(len(time_shift), dtype='bool')
if max_offset is not None:
valid[np.abs(time_shift) > max_offset] = False
ix = np.argmax(corr[valid])
offset = time_shift[valid][ix]
ratio = np.max(corr) / np.median(np.abs(corr))
return offset, ratio
def get_audio_offset(audio1, audio2, sample_rate):
corr = signal.fftconvolve(audio1[::-1], audio2, mode='full')
ccs = signal.fftconvolve(np.abs(corr), np.ones(sample_rate // 8), mode='same')
ix = np.argmax(ccs)
ix_new = np.argmax(corr[ix - sample_rate:ix+sample_rate]) + ix - sample_rate
time_shift = np.arange(-len(audio1) + 1, len(audio2))
offset = time_shift[ix_new] / sample_rate
return offset
def get_specgram_offset(signal1, signal2, sample_rate, nperseg=8192, freq_limit=None, **args):
f, t1, stft1 = signal.spectrogram(signal1, sample_rate, nperseg=nperseg, **args)
f, t2, stft2 = signal.spectrogram(signal2, sample_rate, nperseg=nperseg, **args)
if freq_limit is None:
freq_limit = np.inf
power1 = stft1[f < freq_limit]
power2 = stft2[f < freq_limit]
x = np.zeros(power1.shape[1] + power2.shape[1] - 1)
for i in range(power1.shape[0]):
c = signal.fftconvolve(power1[i, ::-1], power2[i], mode='full')
x += c / np.abs(np.max(c))
corr = x / power1.shape[0]
time_shift = np.arange(-len(t1) + 1, len(t2))
ix = np.argmax(corr)
offset = time_shift[ix]
if offset >= 0:
return t2[offset]
else:
return -1 * t1[abs(offset)]
def get_multiscale_specgram_offset(signal1, signal2, sample_rate, freq_limit=8000):
t_offset = get_specgram_offset(signal1, signal2, sample_rate, nperseg=16384, noverlap=0,
freq_limit=freq_limit)
clip = int(round(abs(t_offset) * sample_rate))
start = sample_rate * 15
end = start + sample_rate * 120
if t_offset < 0:
crop1 = signal1[clip+start : clip+end]
crop2 = signal2[start : end]
else:
crop1 = signal1[start : end]
crop2 = signal2[clip+start : clip+end]
# t_offset_add = get_specgram_offset(crop1, crop2, sample_rate, nperseg=256,
# noverlap=None, freq_limit=8000)
t_offset_add = get_audio_offset(crop1, crop2, sample_rate)
return t_offset + t_offset_add
video1_path = '/home/lili/data/dancing/disco/compressed/2023-04-14/gopro1.mkv'
video2_path = '/home/lili/data/dancing/disco/compressed/2023-04-14/gopro3.mkv'
sample_rate = 44000
audio1 = get_audio_data(video1_path, sample_rate)
audio2 = get_audio_data(video2_path, sample_rate)
signal1 = audio1
signal2 = audio2
t_offset = get_multiscale_specgram_offset(audio1, audio2, sample_rate)
clip = int(round(abs(t_offset) * sample_rate))
if t_offset < 0:
crop1 = signal1[clip:]
crop2 = signal2
else:
crop1 = signal1
crop2 = signal2[clip:]
m = min(len(crop1), len(crop2))
crop1 = crop1[:m]
crop2 = crop2[:m]
sos = signal.butter(3, 500, btype='lowpass',
fs=sample_rate, output='sos')
crop1f = signal.sosfiltfilt(sos, crop1)
crop2f = signal.sosfiltfilt(sos, crop2)
from tqdm import trange
interval = sample_rate * 30
n_chunks = int(len(crop1)/interval)
offsets = np.zeros(n_chunks)
ratios = np.zeros(n_chunks)
for i in trange(n_chunks, ncols=70):
start = interval * i
end = start + sample_rate*30
c1 = crop1f[start:end]
c2 = crop2f[start:end]
t, ratio = get_audio_offset_simple(c1, c2, sample_rate)
# t = get_audio_offset(c1, c2, sample_rate)
# t = get_specgram_offset(c1, c2, sample_rate, nperseg=256, freq_limit=7000)
offsets[i] = t
ratios[i] = ratio
minutes = np.arange(len(offsets)) * (interval / sample_rate) / 60.0
plt.figure(1)
plt.clf()
plt.plot(minutes, offsets)
# plt.plot(ratios / 500)
plt.ylabel('Offset (s)')
plt.xlabel('Time (min)')
plt.title("Offest between GoPro 1 and GoPro 2")
plt.draw()
plt.show(block=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment