Last active
December 23, 2019 12:21
-
-
Save xycui/43b2a24a7b3f26c72e45492ea1de5d7d to your computer and use it in GitHub Desktop.
normalize the audio level to 0 with ffmpeg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
certifi==2019.11.28 | |
chardet==3.0.4 | |
idna==2.8 | |
requests==2.22.0 | |
urllib3==1.25.7 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import platform | |
import os | |
import sys | |
import requests | |
import zipfile | |
import lzma | |
import tarfile | |
import glob | |
import re | |
import multiprocessing | |
import threading | |
import logging | |
from subprocess import Popen, PIPE | |
FFMPEG_PATH='./ffmpeg' | |
FFMPEG_URI=None | |
FFMPEG_ENTRY=None | |
CORE_COUNT=multiprocessing.cpu_count() | |
#pre-compile for speed up | |
VOLUME_PATTERN=re.compile(r"^.*?max_volume: (?P<volumeLevel>[\+\-]?[\d\.]+) dB$") | |
''' | |
FFMPEG_MAC_BINPATH='ffmpeg-latest-macos64-static/bin/ffmpeg' | |
FFMPEG_WIN32_BINPATH='ffmpeg-latest-win32-static/bin/ffmpeg.exe' | |
FFMPEG_WIN64_BINPATH='ffmpeg-latest-win64-static/bin/ffmpeg.exe' | |
''' | |
# 1. Judge platform according to sys.platform | |
# 2. Judge the architecture by platform.architecture | |
def initEnv(): | |
FFMPEG_WIN32_URI='https://ffmpeg.zeranoe.com/builds/win32/static/ffmpeg-latest-win32-static.zip' | |
FFMPEG_WIN64_URI='https://ffmpeg.zeranoe.com/builds/win64/static/ffmpeg-latest-win64-static.zip' | |
FFMPEG_MAC_URI='https://ffmpeg.zeranoe.com/builds/macos64/static/ffmpeg-latest-macos64-static.zip' | |
#print(sys.platform) | |
global FFMPEG_URI | |
if sys.platform == "linux" or sys.platform == "linux2": | |
FFMPEG_URI='https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-32bit-static.tar.xz' | |
elif sys.platform == "darwin": | |
FFMPEG_URI = FFMPEG_MAC_URI | |
elif sys.platform == "win32": | |
FFMPEG_URI = FFMPEG_WIN64_URI if platform.architecture()[0]=='64bit' else FFMPEG_WIN32_URI | |
# download the file and unzip | |
def initFfmpeg(): | |
if FFMPEG_URI is None: | |
raise Exception("Only support Windows & MacOS") | |
successMarkFile = os.path.join(FFMPEG_PATH,'downloaded') | |
ffmpegPackageName = os.path.join(FFMPEG_PATH, os.path.split(FFMPEG_URI)[1]) | |
if not os.path.exists(FFMPEG_PATH): | |
os.mkdir(FFMPEG_PATH) | |
if not os.path.exists(successMarkFile): | |
print("Downloading the ffmpeg package. Please wait...") | |
download(FFMPEG_URI,ffmpegPackageName) | |
unzipFile(ffmpegPackageName,FFMPEG_PATH) | |
with open(successMarkFile,"w") as output: | |
output.write("done") | |
initEntryPoint() | |
print("Ffmpeg binary loaded.") | |
# search file in the given path | |
def initEntryPoint(): | |
global FFMPEG_ENTRY | |
tryList = [os.path.join(FFMPEG_PATH,'**/bin/ffmpeg.exe'),os.path.join(FFMPEG_PATH,'**/bin/ffmpeg'),os.path.join(FFMPEG_PATH,'**/ffmpeg')] | |
cnt=0 | |
for item in tryList: | |
cnt = cnt + 1 | |
if FFMPEG_ENTRY: | |
break | |
for filename in glob.iglob(item, recursive=True): | |
if filename: | |
FFMPEG_ENTRY = filename | |
break | |
if not FFMPEG_ENTRY: | |
raise Exception("ffmpeg binary not found!") | |
if cnt == 3: | |
with Popen(["chmod", "+x",FFMPEG_ENTRY], stdout=PIPE, stderr=PIPE) as proc: | |
for line in proc.stdout.readlines(): | |
line = line.decode(sys.stdout.encoding).strip() #IMPORTANT | |
print(line) | |
# unzip/unpack the file according to extension | |
def unzipFile(filename, targetDir): | |
if filename.endswith('.zip'): | |
with zipfile.ZipFile(filename, "r") as zipFile: | |
zipFile.extractall(targetDir) | |
elif filename.endswith('.tar.xz'): | |
with lzma.open(filename) as f: | |
with tarfile.open(fileobj=f) as tar: | |
tar.extractall(targetDir) | |
# donwload file and show progress | |
def download(url, filename): | |
with open(filename, 'wb') as f: | |
response = requests.get(url, stream=True) | |
total = response.headers.get('content-length') | |
if total is None: | |
f.write(response.content) | |
else: | |
downloaded = 0 | |
total = int(total) | |
for data in response.iter_content(chunk_size=max(int(total/1000), 1024*1024)): | |
downloaded += len(data) | |
f.write(data) | |
done = int(50*downloaded/total) | |
sys.stdout.write('\r[{}{}]'.format('=' * done, ' ' * (50-done))) | |
sys.stdout.flush() | |
sys.stdout.write('\n') | |
def detectMaxVolumeLevel(audioFile): | |
ret = 0 | |
with Popen([FFMPEG_ENTRY,"-i",audioFile,"-filter:a", "volumedetect","-f","null","/dev/null"], stdout=PIPE, stderr=PIPE) as proc: | |
for line in proc.stderr.readlines(): | |
# the original line would be show as b'xxxx\r\n' | |
# Use sys.stderr|stdout.encoding to decode the output and trim | |
line = line.decode(sys.stderr.encoding,'ignore').strip() #IMPORTANT | |
m = VOLUME_PATTERN.match(line) | |
if m: | |
ret = float(m.group('volumeLevel')) | |
return ret | |
def normalizeVolume(audioFile, outputFileName): | |
enableLog = False | |
tuneLevel = 0-detectMaxVolumeLevel(audioFile) | |
print("Tuning Audio file: {0} with level: {2}{1} dB".format(audioFile,str(tuneLevel), "" if tuneLevel<=0 else "+")) | |
with Popen([FFMPEG_ENTRY,"-y","-i",audioFile,"-filter:a", "volume={0}dB".format(str(tuneLevel)),outputFileName], stdout=PIPE, stderr=PIPE) as proc: #, stdout=PIPE, stderr=PIPE | |
if enableLog: | |
for line in proc.stderr.readlines(): | |
line = line.decode(sys.stderr.encoding,'ignore').strip() #IMPORTANT | |
print(line) | |
print("Finish tuning: {0}".format(audioFile)) | |
def normalizeVolumeFromFileList(audioFiles: list): | |
for item in audioFiles: | |
normalizeVolume(item, getOutputFileName(item)) | |
def normalizeVolumeBatch(inputFolder): | |
files = [os.path.join(inputFolder,f) for f in os.listdir(inputFolder) if os.path.isfile(os.path.join(inputFolder,f))] | |
if CORE_COUNT == 1: | |
for f in files: | |
normalizeVolume(f, getOutputFileName(f)) | |
else: | |
taskList = [] | |
for idx, item in enumerate(split_list(files , CORE_COUNT)): | |
if len(item) ==0: | |
continue | |
t = threading.Thread(target=normalizeVolumeFromFileList, args=(item,)) | |
t.start() | |
taskList.append(t) | |
for item in taskList: | |
item.join() | |
def getOutputFileName(inputFileName): | |
baseName = os.path.basename(inputFileName) | |
outputName = os.path.join(os.path.split(inputFileName)[0],"processed",baseName) | |
if not os.path.exists(os.path.dirname(outputName)): | |
try: | |
os.mkdir(os.path.dirname(outputName)) | |
except: | |
pass | |
return outputName | |
def split_list(alist, wanted_parts=1): | |
length = len(alist) | |
return [ alist[i*length // wanted_parts: (i+1)*length // wanted_parts] | |
for i in range(wanted_parts) ] | |
initEnv() | |
initFfmpeg() | |
print(FFMPEG_ENTRY) | |
print("\n") | |
print("Drag/input the audio file/folder here for the path.") | |
print("To use current folder. Press ENTER directly...") | |
unprocessedPath = input("Audio file/folder path: ") | |
if not unprocessedPath: | |
unprocessedPath = "./" | |
if os.path.isdir(unprocessedPath): | |
normalizeVolumeBatch(unprocessedPath) | |
elif os.path.isfile(unprocessedPath): | |
outputName = getOutputFileName(unprocessedPath) | |
normalizeVolume(unprocessedPath, outputName) | |
else: | |
print("Path not valid") | |
input("Press ENTER to exist...") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment