-
-
Save puzzledsam/c0731702a9eab244afacbcb777c9f5e9 to your computer and use it in GitHub Desktop.
Detect intros in two video files using FFmpeg and FPCALC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Intro Detection based on FPCALC fingeprints (WIP) | |
Most of the intro detection algorithm is derived from VictorBitca/matcher, which was originally written in Go. | |
https://github.com/VictorBitca/matcher | |
""" | |
### IMPORTS ### | |
import argparse | |
import os | |
import subprocess | |
import json | |
import matplotlib.pyplot as plt # Not required, just wanted to get a graph of the Hamming distances | |
### PARAMETERS & CONSTANTS ### | |
FFMPEG_COMMAND = "ffmpeg" | |
FPCALC_COMMAND = "fpcalc" | |
TEMP_STORE = "tempIntroDetection" # Folder to store audio temporarily | |
### CLASSES ### | |
# No classes currently, but that might change | |
### FUNCTIONS ### | |
# Keep integer in specified range | |
def clip(val, min, max): | |
if val < min: | |
return min | |
elif val > max: | |
return max | |
else: | |
return val | |
# Calculate Hamming distance between to integers (bit difference) | |
def getHammingDistance(n1, n2): | |
x = n1 ^ n2 | |
setBits = 0 | |
while (x > 0): | |
setBits += x & 1 | |
x >>= 1 | |
return setBits | |
# Calculate the similarity of two fingerprints | |
def compareFingerprints(f1, f2): | |
dist = 0 | |
if len(f1) != len(f2): | |
return 0 | |
for i in range(len(f1)): | |
dist = dist + getHammingDistance(f1[i], f2[i]) | |
score = 1 - dist / (len(f1) * 32) # Assuming 32 bit integer | |
return score | |
# Slide fingerprints to find best offset | |
def getBestOffset(f1, f2): | |
length = len(f1) | |
iterations = length + 1 | |
diff = (length / 2) - 1 | |
a = length / 2 | |
b = length - 1 | |
x = 0 | |
y = (length / 2) - 1 | |
output = [] | |
for i in range(iterations): | |
upper = abs(a - b) | |
output.append(compareFingerprints(f1[int(a):int(a + upper)], f2[int(x):int(x + upper)])) | |
a = clip(a - 1, 0, length - 1) | |
if diff < 0: | |
b = clip(b - 1, 0, length - 1) | |
x = clip(x + 1, 0, length - 1) | |
y = clip(y, 0, length - 1) | |
else: | |
b = clip(b, 0, length - 1) | |
x = clip(x, 0, length - 1) | |
y = clip(y + 1, 0, length - 1) | |
diff = diff - 1 | |
#print(a, b, x, output[i]) # Just so I could debug | |
index = output.index(max(output)) | |
return (iterations - 1) / 2 - index | |
# Align the fingerprints according to the calculated offset | |
def getAlignedFingerprints(offset, f1, f2): | |
if offset >= 0: | |
offsetCorrectedF1 = f1[int(offset):int(len(f1))] | |
offsetCorrectedF2 = f2[0:int(len(f2) - offset)] | |
else: | |
offsetCorrectedF1 = f1[0:int(len(f1) - abs(offset))] | |
offsetCorrectedF2 = f2[int(abs(offset)):int(len(f2))] | |
return offsetCorrectedF1, offsetCorrectedF2 | |
# Find the intro region based on Hamming distances | |
def findContiguousRegion(arr, upperLimit): | |
start = -1 | |
end = -1 | |
for i in range(len(arr)): | |
if (arr[i] < upperLimit) and nextOnesAreAlsoSmall(arr, i, upperLimit): | |
if start == -1: | |
start = i | |
end = i | |
return start, end | |
# Look at next elements in the array and determine if they also fall below the upper limit | |
def nextOnesAreAlsoSmall(arr, index, upperLimit): | |
if (index + 3) < len(arr): | |
v1 = arr[index + 1] | |
v2 = arr[index + 2] | |
v3 = arr[index + 3] | |
average = (v1 + v2 + v3) / 3 | |
if average < upperLimit: | |
return True | |
return False | |
return False | |
### SCRIPT ### | |
def cli(): | |
"""Command Line Interface""" | |
parser = argparse.ArgumentParser(description="Detect intros automatically with the power of... fingerprints?", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument('-i', '--input', type=str, required=True, | |
help="Input video file") | |
parser.add_argument('-i2', '--input2', type=str, required=True, | |
help="Input video file 2") | |
parser.add_argument('-ff', '--ffmpeg', type=str, default=FFMPEG_COMMAND, | |
help="FFMPEG Path") | |
parser.add_argument('-fp', '--fpcalc', type=str, default=FPCALC_COMMAND, | |
help="FPCALC Path") | |
return parser.parse_args() | |
def main(args): | |
"""""" | |
# Create a directory to hold the audio files | |
if not os.path.exists(TEMP_STORE): | |
os.mkdir(TEMP_STORE) | |
print("\nStarting process...") | |
audio1_save_path = os.path.join(TEMP_STORE, "audio1.wav") | |
audio2_save_path = os.path.join(TEMP_STORE, "audio2.wav") | |
print("\nGetting audio chunk from both videos...", end = " ", flush = True) | |
subprocess.run([args.ffmpeg, "-t", "300", "-i", args.input, "-ac", "1", "-acodec", "pcm_s16le", "-ar", "16000", "-c:v", "nul", audio1_save_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
print("50%", end = " ", flush = True) | |
subprocess.run([args.ffmpeg, "-t", "300", "-i", args.input2, "-ac", "1", "-acodec", "pcm_s16le", "-ar", "16000", "-c:v", "nul", audio2_save_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
print("Done!") | |
print("\nFingerprinting audio...", end = " ", flush = True) | |
# Using 300 second length to get a more accurate fingerprint, but it's not required | |
fpcalc_process1 = subprocess.run([args.fpcalc, audio1_save_path, "-length", "300", "-raw", "-json"], stdout=subprocess.PIPE) | |
audio_fingerprint1 = json.loads(fpcalc_process1.stdout) | |
print("50%", end = " ", flush = True) | |
fpcalc_process2 = subprocess.run([args.fpcalc, audio2_save_path, "-length", "300", "-raw", "-json"], stdout=subprocess.PIPE) | |
audio_fingerprint2 = json.loads(fpcalc_process2.stdout) | |
print("Done!") | |
# This is where the fun begins | |
print("\nAnalyzing fingerprints...") | |
fprint1 = audio_fingerprint1["fingerprint"] | |
fprint2 = audio_fingerprint2["fingerprint"] | |
# We'll cut off a bit of the end if the fingerprints have an odd numbered length | |
if (len(fprint1) % 2) != 0: | |
fprint1 = fprint1[0:len(fprint1) - 1] | |
fprint2 = fprint2[0:len(fprint2) - 1] | |
offset = getBestOffset(fprint1, fprint2) | |
print("The calculated fingerprint offset is {0}".format(offset)) | |
f1, f2 = getAlignedFingerprints(offset, fprint1, fprint2) | |
hammingDistances = [] | |
for i in range(min(len(f1), len(f2))): | |
hammingDistances.append(getHammingDistance(f1[i], f2[i])) | |
xaxis = [] | |
for i in range(len(hammingDistances)): | |
xaxis.append(i) | |
plt.plot(xaxis, hammingDistances) | |
plt.title("Hamming Distances between audio chunks from two video files") | |
plt.xlabel("Fingerprint index (between 0 and {0})".format(len(hammingDistances))) | |
plt.ylabel("Distances") | |
plt.savefig(os.path.join(TEMP_STORE, "result.png")) | |
print("Done!") | |
print("\nHamming distances:\n{0}".format(hammingDistances)) # For debugging | |
start, end = findContiguousRegion(hammingDistances, 8) # 8 is the upper Hamming distance limit to be considered similar between the two fingerprints | |
secondsPerSample = 300 / len(fprint1) # 300 is the length in seconds of our audio samples | |
offsetInSeconds = offset * secondsPerSample | |
commonRegionStart = start * secondsPerSample | |
commonRegionEnd = end * secondsPerSample | |
print("\nSeconds per sample: {0}\nOffset in seconds: {1}\nCommon region starts at {2} and ends at {3}".format(secondsPerSample, offsetInSeconds, commonRegionStart, commonRegionEnd)) | |
firstFileRegionStart = 0.0 | |
firstFileRegionEnd = 0.0 | |
secondFileRegionStart = 0.0 | |
secondFileRegionEnd = 0.0 | |
if offset >= 0: | |
firstFileRegionStart = commonRegionStart + offsetInSeconds | |
firstFileRegionEnd = commonRegionEnd + offsetInSeconds | |
secondFileRegionStart = commonRegionStart | |
secondFileRegionEnd = commonRegionEnd | |
else: | |
firstFileRegionStart = commonRegionStart | |
firstFileRegionEnd = commonRegionEnd | |
secondFileRegionStart = commonRegionStart - offsetInSeconds | |
secondFileRegionEnd = commonRegionEnd - offsetInSeconds | |
# Check for impossible situation, or if the common region is deemed too short to be considered an intro | |
if (start < 0) or (end < 0): | |
firstFileRegionStart = 0.0 | |
firstFileRegionEnd = 0.0 | |
secondFileRegionStart = 0.0 | |
secondFileRegionEnd = 0.0 | |
elif (commonRegionEnd - commonRegionStart) < 10: | |
# -1 means intro does not exists | |
firstFileRegionStart = -1.0 | |
firstFileRegionEnd = -1.0 | |
secondFileRegionStart = -1.0 | |
secondFileRegionEnd = -1.0 | |
print("\nFound intro ranges (in seconds) are:\n First File:", [firstFileRegionStart, firstFileRegionEnd], "\n Second File:", [secondFileRegionStart, secondFileRegionEnd]) # Magic is done! | |
# Cleanup should ideally happen here to get rid of the temporary files | |
if __name__ == "__main__": | |
main(cli()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment