Created
December 20, 2020 12:18
-
-
Save markk/aab817e24ba6ff142795a2803c43f5cc to your computer and use it in GitHub Desktop.
phaseanalyze: analyze a set of audio files to find optimum phase configuration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: UTF-8 -*- | |
""" | |
Analyze a set of audio files to find optimum phase configuration. | |
""" | |
import argparse | |
import itertools | |
import os | |
import subprocess | |
import tabulate | |
MAXIMUM_DURATION = 60 | |
TMPDIR = "/tmp" | |
class TooLongError(Exception): | |
"""Raise when input files are too long to analyse.""" | |
def __init__(self, audiofile, duration): | |
self.message = "File {} is {} seconds long!".format(audiofile, duration) | |
super().__init__(self.message) | |
class IdentificationFailed(Exception): | |
"""Raise when sndfile-info fails.""" | |
def __init__(self, audiofile): | |
self.message = "Audio identification of {} failed!".format(audiofile) | |
super().__init__(self.message) | |
class MixAnalyzeFailed(Exception): | |
"""Raise when sox mix call fails.""" | |
def __init__(self, cmd): | |
self.message = "Call to sox failed!\n {}".format("".join(cmd)) | |
super().__init__(self.message) | |
def readaudiofile(audiofile): | |
"""Return channels, frames, samplerate""" | |
print("analysing file {0}...".format(audiofile)) | |
info = subprocess.run(["sndfile-info", audiofile], stdout=subprocess.PIPE) | |
channels = None | |
frames = None | |
rate = None | |
for infoline in info.stdout.decode().split("\n"): | |
if infoline.startswith("Channels"): | |
channels = int(infoline.split(":")[-1].strip()) | |
if infoline.startswith("Frames"): | |
frames = int(infoline.split(":")[-1].strip()) | |
if infoline.startswith("Sample Rate"): | |
rate = int(infoline.split(":")[-1].strip()) | |
if None in (channels, frames, rate): | |
raise IdentificationFailed(audiofile) | |
return channels, frames, rate | |
def splitmultichannel(audiofile, channels): | |
"""Split into mono audio files.""" | |
audiobasename = audiofile.rpartition("/")[2] | |
monofiles = [] | |
for ch in range(channels): | |
monofile = "{}/{}_{}".format(TMPDIR, ch + 1, audiobasename) | |
cmd = ["sox", audiofile, monofile, "remix", str(ch + 1)] | |
subprocess.run(cmd) | |
monofiles.append(monofile) | |
return monofiles | |
def processpermutation(audiofiles, permutation): | |
"""Mix files with sox and measure RMS level.""" | |
cmd = ["sox", "-m"] | |
for phase, audiofile in zip(permutation, audiofiles): | |
if phase: | |
cmd.extend([phase, str(audiofile)]) | |
else: | |
cmd.append(str(audiofile)) | |
cmd.extend(["-n", "stats"]) | |
info = subprocess.run(cmd, stderr=subprocess.PIPE) | |
for infoline in info.stderr.decode().split("\n"): | |
if infoline.startswith("RMS lev dB"): | |
return float(infoline.split("dB")[-1].strip()) | |
raise MixAnalyzeFailed(cmd) | |
def execute(args): | |
audiofiles = [] | |
tmpmonofiles = [] | |
for audiofile in args.audiofiles: | |
channels, frames, rate = readaudiofile(audiofile) | |
if (duration := int(frames / rate)) > MAXIMUM_DURATION: | |
raise TooLongError(audiofile, duration) | |
if channels > 1: | |
tmpmonofiles.extend(splitmultichannel(audiofile, channels)) | |
else: | |
audiofiles.append(audiofile) | |
audiofiles.extend(tmpmonofiles) | |
results = {} | |
for permutation in itertools.product(["", "-v -1"], repeat=len(audiofiles)): | |
print(".", end="", flush=True) | |
inversepermutation = tuple("" if x else "-v -1" for x in permutation) | |
if inversepermutation not in results: | |
results[permutation] = processpermutation(audiofiles, permutation) | |
else: | |
if len("".join(permutation)) < len("".join(inversepermutation)): | |
results[permutation] = results[inversepermutation] | |
del results[inversepermutation] | |
maxkey = max(results, key=lambda key: results[key]) | |
optimization = [["Channel", "Phase"]] | |
optimization.extend(zip(audiofiles, ["ø" if x else "" for x in maxkey])) | |
print("\n\n", tabulate.tabulate(optimization, headers="firstrow")) | |
print("\nUnchanged RMS {} dB".format(results[tuple("" for i in range(len(audiofiles)))])) | |
print("Optimized RMS {} dB".format(results[maxkey])) | |
print("\n{} phase permutations calculated.".format(len(results))) | |
# delete mono audio files | |
for tmf in tmpmonofiles: | |
os.unlink(tmf) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument("audiofiles", metavar="FILE", nargs="+", help="audio files to analyze") | |
args = parser.parse_args() | |
execute(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment