Skip to content

Instantly share code, notes, and snippets.

@xycui
Last active December 23, 2019 12:21
Show Gist options
  • Save xycui/43b2a24a7b3f26c72e45492ea1de5d7d to your computer and use it in GitHub Desktop.
Save xycui/43b2a24a7b3f26c72e45492ea1de5d7d to your computer and use it in GitHub Desktop.
normalize the audio level to 0 with ffmpeg
certifi==2019.11.28
chardet==3.0.4
idna==2.8
requests==2.22.0
urllib3==1.25.7
import platform
import os
import sys
import requests
import zipfile
import lzma
import tarfile
import glob
import re
import multiprocessing
import threading
import logging
from subprocess import Popen, PIPE
FFMPEG_PATH='./ffmpeg'
FFMPEG_URI=None
FFMPEG_ENTRY=None
CORE_COUNT=multiprocessing.cpu_count()
#pre-compile for speed up
VOLUME_PATTERN=re.compile(r"^.*?max_volume: (?P<volumeLevel>[\+\-]?[\d\.]+) dB$")
'''
FFMPEG_MAC_BINPATH='ffmpeg-latest-macos64-static/bin/ffmpeg'
FFMPEG_WIN32_BINPATH='ffmpeg-latest-win32-static/bin/ffmpeg.exe'
FFMPEG_WIN64_BINPATH='ffmpeg-latest-win64-static/bin/ffmpeg.exe'
'''
# 1. Judge platform according to sys.platform
# 2. Judge the architecture by platform.architecture
def initEnv():
FFMPEG_WIN32_URI='https://ffmpeg.zeranoe.com/builds/win32/static/ffmpeg-latest-win32-static.zip'
FFMPEG_WIN64_URI='https://ffmpeg.zeranoe.com/builds/win64/static/ffmpeg-latest-win64-static.zip'
FFMPEG_MAC_URI='https://ffmpeg.zeranoe.com/builds/macos64/static/ffmpeg-latest-macos64-static.zip'
#print(sys.platform)
global FFMPEG_URI
if sys.platform == "linux" or sys.platform == "linux2":
FFMPEG_URI='https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-32bit-static.tar.xz'
elif sys.platform == "darwin":
FFMPEG_URI = FFMPEG_MAC_URI
elif sys.platform == "win32":
FFMPEG_URI = FFMPEG_WIN64_URI if platform.architecture()[0]=='64bit' else FFMPEG_WIN32_URI
# download the file and unzip
def initFfmpeg():
if FFMPEG_URI is None:
raise Exception("Only support Windows & MacOS")
successMarkFile = os.path.join(FFMPEG_PATH,'downloaded')
ffmpegPackageName = os.path.join(FFMPEG_PATH, os.path.split(FFMPEG_URI)[1])
if not os.path.exists(FFMPEG_PATH):
os.mkdir(FFMPEG_PATH)
if not os.path.exists(successMarkFile):
print("Downloading the ffmpeg package. Please wait...")
download(FFMPEG_URI,ffmpegPackageName)
unzipFile(ffmpegPackageName,FFMPEG_PATH)
with open(successMarkFile,"w") as output:
output.write("done")
initEntryPoint()
print("Ffmpeg binary loaded.")
# search file in the given path
def initEntryPoint():
global FFMPEG_ENTRY
tryList = [os.path.join(FFMPEG_PATH,'**/bin/ffmpeg.exe'),os.path.join(FFMPEG_PATH,'**/bin/ffmpeg'),os.path.join(FFMPEG_PATH,'**/ffmpeg')]
cnt=0
for item in tryList:
cnt = cnt + 1
if FFMPEG_ENTRY:
break
for filename in glob.iglob(item, recursive=True):
if filename:
FFMPEG_ENTRY = filename
break
if not FFMPEG_ENTRY:
raise Exception("ffmpeg binary not found!")
if cnt == 3:
with Popen(["chmod", "+x",FFMPEG_ENTRY], stdout=PIPE, stderr=PIPE) as proc:
for line in proc.stdout.readlines():
line = line.decode(sys.stdout.encoding).strip() #IMPORTANT
print(line)
# unzip/unpack the file according to extension
def unzipFile(filename, targetDir):
if filename.endswith('.zip'):
with zipfile.ZipFile(filename, "r") as zipFile:
zipFile.extractall(targetDir)
elif filename.endswith('.tar.xz'):
with lzma.open(filename) as f:
with tarfile.open(fileobj=f) as tar:
tar.extractall(targetDir)
# donwload file and show progress
def download(url, filename):
with open(filename, 'wb') as f:
response = requests.get(url, stream=True)
total = response.headers.get('content-length')
if total is None:
f.write(response.content)
else:
downloaded = 0
total = int(total)
for data in response.iter_content(chunk_size=max(int(total/1000), 1024*1024)):
downloaded += len(data)
f.write(data)
done = int(50*downloaded/total)
sys.stdout.write('\r[{}{}]'.format('=' * done, ' ' * (50-done)))
sys.stdout.flush()
sys.stdout.write('\n')
def detectMaxVolumeLevel(audioFile):
ret = 0
with Popen([FFMPEG_ENTRY,"-i",audioFile,"-filter:a", "volumedetect","-f","null","/dev/null"], stdout=PIPE, stderr=PIPE) as proc:
for line in proc.stderr.readlines():
# the original line would be show as b'xxxx\r\n'
# Use sys.stderr|stdout.encoding to decode the output and trim
line = line.decode(sys.stderr.encoding,'ignore').strip() #IMPORTANT
m = VOLUME_PATTERN.match(line)
if m:
ret = float(m.group('volumeLevel'))
return ret
def normalizeVolume(audioFile, outputFileName):
enableLog = False
tuneLevel = 0-detectMaxVolumeLevel(audioFile)
print("Tuning Audio file: {0} with level: {2}{1} dB".format(audioFile,str(tuneLevel), "" if tuneLevel<=0 else "+"))
with Popen([FFMPEG_ENTRY,"-y","-i",audioFile,"-filter:a", "volume={0}dB".format(str(tuneLevel)),outputFileName], stdout=PIPE, stderr=PIPE) as proc: #, stdout=PIPE, stderr=PIPE
if enableLog:
for line in proc.stderr.readlines():
line = line.decode(sys.stderr.encoding,'ignore').strip() #IMPORTANT
print(line)
print("Finish tuning: {0}".format(audioFile))
def normalizeVolumeFromFileList(audioFiles: list):
for item in audioFiles:
normalizeVolume(item, getOutputFileName(item))
def normalizeVolumeBatch(inputFolder):
files = [os.path.join(inputFolder,f) for f in os.listdir(inputFolder) if os.path.isfile(os.path.join(inputFolder,f))]
if CORE_COUNT == 1:
for f in files:
normalizeVolume(f, getOutputFileName(f))
else:
taskList = []
for idx, item in enumerate(split_list(files , CORE_COUNT)):
if len(item) ==0:
continue
t = threading.Thread(target=normalizeVolumeFromFileList, args=(item,))
t.start()
taskList.append(t)
for item in taskList:
item.join()
def getOutputFileName(inputFileName):
baseName = os.path.basename(inputFileName)
outputName = os.path.join(os.path.split(inputFileName)[0],"processed",baseName)
if not os.path.exists(os.path.dirname(outputName)):
try:
os.mkdir(os.path.dirname(outputName))
except:
pass
return outputName
def split_list(alist, wanted_parts=1):
length = len(alist)
return [ alist[i*length // wanted_parts: (i+1)*length // wanted_parts]
for i in range(wanted_parts) ]
initEnv()
initFfmpeg()
print(FFMPEG_ENTRY)
print("\n")
print("Drag/input the audio file/folder here for the path.")
print("To use current folder. Press ENTER directly...")
unprocessedPath = input("Audio file/folder path: ")
if not unprocessedPath:
unprocessedPath = "./"
if os.path.isdir(unprocessedPath):
normalizeVolumeBatch(unprocessedPath)
elif os.path.isfile(unprocessedPath):
outputName = getOutputFileName(unprocessedPath)
normalizeVolume(unprocessedPath, outputName)
else:
print("Path not valid")
input("Press ENTER to exist...")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment