Skip to content

Instantly share code, notes, and snippets.

@tin2tin
Created June 27, 2024 14:50
Show Gist options
  • Save tin2tin/5e59f62c826bc8275f4318430a5976f5 to your computer and use it in GitHub Desktop.
Save tin2tin/5e59f62c826bc8275f4318430a5976f5 to your computer and use it in GitHub Desktop.
Audio sync
bl_info = {
"name": "Robust Audio Sync",
"author": "tintwotin",
"version": (1, 8),
"blender": (2, 80, 0),
"location": "Sequencer > Strip > Transform > Robust Audio Sync",
"description": "Sync similar audio recordings",
"warning": "",
"doc_url": "",
"category": "Sequencer",
}
import bpy
import numpy as np
import subprocess
import sys
import site
import re
def ensure_librosa():
try:
import librosa
except ImportError:
app_path = site.USER_SITE
if app_path not in sys.path:
sys.path.append(app_path)
pybin = sys.executable
subprocess.check_call([pybin, "-m", "pip", "install", "librosa"])
import librosa
print("librosa package installed.")
return librosa
def compute_rms_envelope(audio, sr, frame_length=2048, hop_length=512):
librosa = ensure_librosa()
print(f"Computing RMS envelope with sr={sr}, frame_length={frame_length}, hop_length={hop_length}")
rms_envelope = librosa.feature.rms(y=audio, frame_length=frame_length, hop_length=hop_length)[0]
print(f"RMS envelope computed: {rms_envelope}")
return rms_envelope
def resample_audio(audio, original_rate, target_rate):
librosa = ensure_librosa()
if original_rate != target_rate:
print(f"Resampling audio from {original_rate} Hz to {target_rate} Hz")
audio = librosa.resample(audio, orig_sr=original_rate, target_sr=target_rate)
return audio
def find_offset(reference_env, query_env):
print(f"Finding offset between reference and query envelopes")
correlation = np.correlate(reference_env, query_env, mode='full')
max_corr_index = np.argmax(correlation)
offset_frames = max_corr_index - len(reference_env) + 1
print(f"Offset found: {offset_frames} frames")
return offset_frames
class SEQUENCER_OT_RobustAudioSyncOperator(bpy.types.Operator):
bl_idname = "sequencer.robust_audio_sync"
bl_label = "Robust Audio Sync"
_timer = None
_strips_to_process = []
_current_strip_index = 0
_reference_env = None
_sr = None
_hop_length = 512
@classmethod
def poll(cls, context):
return (context.scene and
context.scene.sequence_editor and
context.scene.sequence_editor.active_strip and
context.scene.sequence_editor.active_strip.type == "SOUND")
def execute(self, context):
print("Executing Robust Audio Sync Operator")
librosa = ensure_librosa()
active_strip = context.scene.sequence_editor.active_strip
reference_file = bpy.path.abspath(active_strip.sound.filepath)
print(f"Loading reference audio file: {reference_file}")
# Load and process reference audio
ref_audio, original_sr = librosa.load(reference_file, sr=None)
#self._sr = context.scene.render.fps # Assume the project's sample rate is set to FPS
self._sr = int(re.search(r"RATE_(\d+)", context.preferences.system.audio_sample_rate).group(1))
self._hop_length = 1024
print(f"Original sample rate: {original_sr}, Project sample rate: {self._sr}")
ref_audio = resample_audio(ref_audio, original_sr, self._sr)
self._reference_env = compute_rms_envelope(ref_audio, self._sr, hop_length=self._hop_length)
self._strips_to_process = [strip for strip in context.selected_sequences
if strip.type == "SOUND" and strip != active_strip]
self._current_strip_index = 0
wm = context.window_manager
self._timer = wm.event_timer_add(0.1, window=context.window)
wm.modal_handler_add(self)
return {'RUNNING_MODAL'}
def modal(self, context, event):
if event.type == 'TIMER':
if self._current_strip_index < len(self._strips_to_process):
strip = self._strips_to_process[self._current_strip_index]
query_file = bpy.path.abspath(strip.sound.filepath)
print(f"Processing strip {self._current_strip_index + 1}/{len(self._strips_to_process)}: {query_file}")
librosa = ensure_librosa()
query_audio, original_sr = librosa.load(query_file, sr=None)
print(f"Original sample rate: {original_sr}, Project sample rate: {self._sr}")
query_audio = resample_audio(query_audio, original_sr, self._sr)
query_env = compute_rms_envelope(query_audio, self._sr, hop_length=self._hop_length)
offset_frames = find_offset(self._reference_env, query_env)
offset_seconds = offset_frames * self._hop_length / self._sr
print(f"Offset seconds: {offset_seconds}")
frame_offset = round(offset_seconds * context.scene.render.fps / context.scene.render.fps_base)
strip.frame_start = context.scene.sequence_editor.active_strip.frame_start + abs(frame_offset)
print(f"Setting strip frame start to {strip.frame_start}")
self._current_strip_index += 1
self.report({'INFO'}, f"Processed {self._current_strip_index} of {len(self._strips_to_process)} strips")
else:
# All strips processed
self.report({'INFO'}, f"Audio sync completed for {len(self._strips_to_process)} strips")
return self.finish(context)
return {'PASS_THROUGH'}
def finish(self, context):
wm = context.window_manager
wm.event_timer_remove(self._timer)
print("Finished processing all strips")
return {'FINISHED'}
def draw_func(self, context):
self.layout.separator()
self.layout.operator("sequencer.robust_audio_sync")
def register():
bpy.utils.register_class(SEQUENCER_OT_RobustAudioSyncOperator)
bpy.types.SEQUENCER_MT_strip_transform.append(draw_func)
print("Robust Audio Sync Operator registered")
def unregister():
bpy.utils.unregister_class(SEQUENCER_OT_RobustAudioSyncOperator)
bpy.types.SEQUENCER_MT_strip_transform.remove(draw_func)
print("Robust Audio Sync Operator unregistered")
if __name__ == "__main__":
register()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment