Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

Created February 6, 2021 22:41
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jpwsutton/28883e65292d0bfb8effe8357712e701 to your computer and use it in GitHub Desktop.
Save jpwsutton/28883e65292d0bfb8effe8357712e701 to your computer and use it in GitHub Desktop.
A very rough python script to synchronise dubbed audio from different videos.
This script will take a Pair of VCD video files as well as a High Quality Blu Ray Rip
and will syncronise the audio and add it as a new track.
# Import Libraries
from pathlib import Path
import os
import math
import librosa #You will need llvm installed as well: brew install llvm
import subprocess
import numpy as np
import argparse
import ffmpeg
def import_audio(filename, output_file, offset=0):
if os.path.isfile(output_file):
return output_file
stream = ffmpeg.input(str(filename))
if offset != 0:
offset = str(offset) + 'ms'
print(f'Trimming the audio! {offset}')
stream = stream.filter_('atrim', start=offset).filter_('asetpts', 'PTS-STARTPTS')
stream = ffmpeg.output(stream, str(output_file), acodec='pcm_f32le', ac=1, ar='44100'), quiet=True)
return output_file
def join_audio_streams(audio_files, workingDirectory):
output_filename = workingDirectory / 'merged.wav'
file_string = " ".join(str(item) for item in audio_files)
command = "sox " + file_string + " " + str(output_filename) #converts the first 5 minutes of video file to wav output
print(command), shell=True)
return output_filename
def generate_silence(ms, directory):
filename = "silence" + str(ms) + ".wav"
filepath = directory / filename
command = "sox -n -r " + str(SAMPLE_RATE )+ " -c 1 " + str(filepath) + " trim 0.0 " + str((ms/1000))
print(command), shell=True)
return filepath
def split_and_import_audio(filename, offset=0):
output_filename = filename.with_suffix('.converted.wav')
output = import_audio(filename, output_filename)
x, sr = librosa.load(output, duration=60, offset=offset)
return x, sr
def process_signal(o):
o = (o - np.mean(o)) / np.std(o)
o = np.where(o > 2, 1.0, 0.0)
# Foward pass
for i in range(1, len(o)):
o[i] = max(o[i], o[i-1] * 0.9)
# Back Pass
for i in range(len(o)-2, 0, -1):
o[i] = max(o[i], o[i+1] * 0.9)
return o
def find_offset(x0, x1):
offsets = tuple(range(-600, 600))
errors = [(measure_error(x0, x1, offset), offset) for offset in offsets]
error, offset = sorted(errors)[0]
return -offset, error
def measure_error(x0, x1, offset):
max_len=min(len(x0), len(x1))
diff = x0[:max_len] - np.roll(x1[:max_len], offset)
err = np.sum(diff**2) / len(diff)
return err
def calculate_correct_offset(hq_x, hq_sr, lq_x, lq_sr ):
# Normalise the two signals so that they are the same average
# amplitude (volume)
lq_x = (lq_x - np.mean(lq_x)) / np.std(lq_x)
hq_x = (hq_x - np.mean(hq_x)) / np.std(hq_x)
# Calculate the 'onset strength' of the files, ie where the parts start
lq_o = librosa.onset.onset_strength(lq_x, sr=lq_sr)
hq_o = librosa.onset.onset_strength(hq_x, sr=hq_sr)
# Process the signal of the two files
lq_s = process_signal(lq_o)
hq_s = process_signal(hq_o)
# Calculate the offset
offset, error = find_offset(lq_s, hq_s)
print(f'Raw offset: {offset}, error: {error}')
offset_adj = ((offset * 1024) / SAMPLE_RATE) * 1000
return offset_adj
if __name__ == '__main__':
# Initiate the parser
parser = argparse.ArgumentParser()
parser.add_argument("hqfile", type=str, help="The High Quality File to add the new audio stream to.")
parser.add_argument("lqfiles", type=str, help="The Low Quality Files to rip the audio stream from, can be comma separated.")
parser.add_argument("-V", "--version", help="show program version", action="store_true")
# Read arguments from the command line
args = parser.parse_args()
# Check for --version or -V
if args.version:
print("This is audio_dub_sync version 0.1")
hq_path = Path(args.hqfile)
print(f'High Quality File: {hq_path}')
print(f'Low Quality Files:')
lq_files = []
for lqfile in args.lqfiles.split(','):
# Iterate through the Low Quality files, and find offesets against the HQ version
hq_offset = 0
lq_offsets = []
#lq_audio_files = []
for lq_file in lq_files:
print(f'Synchronising {lq_file}.')
hq_x, hq_sr = split_and_import_audio(hq_path, hq_offset)
lq_x, lq_sr = split_and_import_audio(lq_file, 0)
lq_duration = librosa.get_duration(filename=str(lq_file))
raw_offset = calculate_correct_offset(hq_x, hq_sr, lq_x, lq_sr )
abs_offset = abs(raw_offset)
trunc_offset = math.trunc(raw_offset)
print(f'Raw: {raw_offset}, trunc: {trunc_offset}')
offset = trunc_offset
if(offset > 0):
offset = offset * 2 # I have no idea why, but totoro made me do this!
print(f'Duration of {lq_file} is {lq_duration} seconds.')
if(offset < 0):
offset = abs(offset)
hq_offset += (lq_duration - offset / 1000)
print(f'Setting HQ Offset to: {hq_offset}')
trimmed_audio_files = []
#lq_x, lq_sr = split_and_import_audio(lq_files[0], 0)
workingDirectory = Path(os.path.dirname(lq_files[0]))
audio_to_join = []
for off in range(len(lq_offsets)):
if(lq_offsets[off] < 0):
# Standard Offset, trim audio from clip
trimmed_fn = lq_files[off].with_suffix('.trimmed.wav')
import_audio(lq_files[off],trimmed_fn, lq_offsets[off])
if(lq_offsets[off] > 0):
# Strange, we need to add some silence as that's the best we can do right now.
silence_file = generate_silence(lq_offsets[off] , workingDirectory)
print(f'Saved Silence file of {lq_offsets[off]}ms to {silence_file}')
print(f'Generated List of files to join: {audio_to_join}')
merged_filename = join_audio_streams(audio_to_join, workingDirectory)
print(f'Merged Audio File: {merged_filename}')
Copy link

m000 commented Feb 11, 2021

Which ffmpeg package are you using?

I tried the obvious options (ffmpeg and python-ffmpeg) but they didn't seem right:

AttributeError: module 'ffmpeg' has no attribute 'input'

Copy link

m000 commented Feb 11, 2021

It was the third obvious choice: ffmpeg-python 🤦🏻‍♂️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment