A very rough python script to synchronise dubbed audio from different videos.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
""" | |
audio_dub_sync.py | |
This script will take a Pair of VCD video files as well as a High Quality Blu Ray Rip | |
and will syncronise the audio and add it as a new track. | |
""" | |
# Import Libraries | |
from pathlib import Path | |
import os | |
import math | |
import librosa #You will need llvm installed as well: brew install llvm | |
import subprocess | |
import numpy as np | |
import argparse | |
import ffmpeg | |
SAMPLE_RATE= 44100 | |
def import_audio(filename, output_file, offset=0): | |
if os.path.isfile(output_file): | |
return output_file | |
else: | |
stream = ffmpeg.input(str(filename)) | |
if offset != 0: | |
offset = str(offset) + 'ms' | |
print(f'Trimming the audio! {offset}') | |
stream = stream.filter_('atrim', start=offset).filter_('asetpts', 'PTS-STARTPTS') | |
stream = ffmpeg.output(stream, str(output_file), acodec='pcm_f32le', ac=1, ar='44100') | |
ffmpeg.run(stream, quiet=True) | |
return output_file | |
def join_audio_streams(audio_files, workingDirectory): | |
output_filename = workingDirectory / 'merged.wav' | |
file_string = " ".join(str(item) for item in audio_files) | |
command = "sox " + file_string + " " + str(output_filename) #converts the first 5 minutes of video file to wav output | |
print(command) | |
subprocess.call(command, shell=True) | |
return output_filename | |
def generate_silence(ms, directory): | |
filename = "silence" + str(ms) + ".wav" | |
filepath = directory / filename | |
command = "sox -n -r " + str(SAMPLE_RATE )+ " -c 1 " + str(filepath) + " trim 0.0 " + str((ms/1000)) | |
print(command) | |
subprocess.call(command, shell=True) | |
return filepath | |
def split_and_import_audio(filename, offset=0): | |
output_filename = filename.with_suffix('.converted.wav') | |
output = import_audio(filename, output_filename) | |
x, sr = librosa.load(output, duration=60, offset=offset) | |
return x, sr | |
def process_signal(o): | |
o = (o - np.mean(o)) / np.std(o) | |
o = np.where(o > 2, 1.0, 0.0) | |
# Foward pass | |
for i in range(1, len(o)): | |
o[i] = max(o[i], o[i-1] * 0.9) | |
# Back Pass | |
for i in range(len(o)-2, 0, -1): | |
o[i] = max(o[i], o[i+1] * 0.9) | |
return o | |
def find_offset(x0, x1): | |
offsets = tuple(range(-600, 600)) | |
errors = [(measure_error(x0, x1, offset), offset) for offset in offsets] | |
error, offset = sorted(errors)[0] | |
return -offset, error | |
def measure_error(x0, x1, offset): | |
max_len=min(len(x0), len(x1)) | |
diff = x0[:max_len] - np.roll(x1[:max_len], offset) | |
err = np.sum(diff**2) / len(diff) | |
return err | |
def calculate_correct_offset(hq_x, hq_sr, lq_x, lq_sr ): | |
# Normalise the two signals so that they are the same average | |
# amplitude (volume) | |
lq_x = (lq_x - np.mean(lq_x)) / np.std(lq_x) | |
hq_x = (hq_x - np.mean(hq_x)) / np.std(hq_x) | |
# Calculate the 'onset strength' of the files, ie where the parts start | |
lq_o = librosa.onset.onset_strength(lq_x, sr=lq_sr) | |
hq_o = librosa.onset.onset_strength(hq_x, sr=hq_sr) | |
# Process the signal of the two files | |
lq_s = process_signal(lq_o) | |
hq_s = process_signal(hq_o) | |
# Calculate the offset | |
offset, error = find_offset(lq_s, hq_s) | |
print(f'Raw offset: {offset}, error: {error}') | |
offset_adj = ((offset * 1024) / SAMPLE_RATE) * 1000 | |
return offset_adj | |
if __name__ == '__main__': | |
# Initiate the parser | |
parser = argparse.ArgumentParser() | |
parser.add_argument("hqfile", type=str, help="The High Quality File to add the new audio stream to.") | |
parser.add_argument("lqfiles", type=str, help="The Low Quality Files to rip the audio stream from, can be comma separated.") | |
parser.add_argument("-V", "--version", help="show program version", action="store_true") | |
# Read arguments from the command line | |
args = parser.parse_args() | |
# Check for --version or -V | |
if args.version: | |
print("This is audio_dub_sync version 0.1") | |
hq_path = Path(args.hqfile) | |
print(f'High Quality File: {hq_path}') | |
print() | |
print(f'Low Quality Files:') | |
lq_files = [] | |
for lqfile in args.lqfiles.split(','): | |
print(lqfile) | |
lq_files.append(Path(lqfile)) | |
print() | |
# Iterate through the Low Quality files, and find offesets against the HQ version | |
hq_offset = 0 | |
lq_offsets = [] | |
#lq_audio_files = [] | |
for lq_file in lq_files: | |
print(f'Synchronising {lq_file}.') | |
hq_x, hq_sr = split_and_import_audio(hq_path, hq_offset) | |
lq_x, lq_sr = split_and_import_audio(lq_file, 0) | |
#lq_audio_files.append(lq_audio) | |
lq_duration = librosa.get_duration(filename=str(lq_file)) | |
raw_offset = calculate_correct_offset(hq_x, hq_sr, lq_x, lq_sr ) | |
abs_offset = abs(raw_offset) | |
trunc_offset = math.trunc(raw_offset) | |
print(f'Raw: {raw_offset}, trunc: {trunc_offset}') | |
offset = trunc_offset | |
if(offset > 0): | |
offset = offset * 2 # I have no idea why, but totoro made me do this! | |
lq_offsets.append(offset) | |
print(f'Duration of {lq_file} is {lq_duration} seconds.') | |
if(offset < 0): | |
offset = abs(offset) | |
hq_offset += (lq_duration - offset / 1000) | |
print(f'Setting HQ Offset to: {hq_offset}') | |
print() | |
print(lq_offsets) | |
trimmed_audio_files = [] | |
#lq_x, lq_sr = split_and_import_audio(lq_files[0], 0) | |
workingDirectory = Path(os.path.dirname(lq_files[0])) | |
audio_to_join = [] | |
for off in range(len(lq_offsets)): | |
if(lq_offsets[off] < 0): | |
# Standard Offset, trim audio from clip | |
trimmed_fn = lq_files[off].with_suffix('.trimmed.wav') | |
import_audio(lq_files[off],trimmed_fn, lq_offsets[off]) | |
audio_to_join.append(trimmed_fn) | |
if(lq_offsets[off] > 0): | |
# Strange, we need to add some silence as that's the best we can do right now. | |
silence_file = generate_silence(lq_offsets[off] , workingDirectory) | |
print(f'Saved Silence file of {lq_offsets[off]}ms to {silence_file}') | |
audio_to_join.append(silence_file) | |
audio_to_join.append(lq_files[off].with_suffix('.trimmed.wav')) | |
print(f'Generated List of files to join: {audio_to_join}') | |
merged_filename = join_audio_streams(audio_to_join, workingDirectory) | |
print(f'Merged Audio File: {merged_filename}') | |
It was the third obvious choice: ffmpeg-python
🤦🏻♂️
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Which ffmpeg package are you using?
I tried the obvious options (
ffmpeg
andpython-ffmpeg
) but they didn't seem right: