Skip to content

Instantly share code, notes, and snippets.

@justinvw
Created May 20, 2012 11:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justinvw/2757713 to your computer and use it in GitHub Desktop.
Save justinvw/2757713 to your computer and use it in GitHub Desktop.
Transcode FLAC audio files into MP4/AAC files
#!/usr/bin/env python
# Requirements:
# - beets (https://github.com/sampsyo/beets)
# - flac (http://flac.sourceforge.net/)
# - faac (http://www.audiocoding.com/faac.html)
# TODOS:
# - Use multiple threads (multiprocessing)
# - Replaygain data is copied from src to dest file, but is this fair?
# Or is recalculation necessary on transcoded file?
# - Does albumart embedding in the dest_file work corectly?
# - Currently only supports FLAC to AAC/MP4 conversion, add other formats
# - Intergrate with beets?
import os
import shutil
import logging
import subprocess
import multiprocessing
import fnmatch
from beets.mediafile import MediaFile
logging.basicConfig(level=logging.DEBUG)
TRANSCODE_SRC_FORMATS = ['.flac']
TRANSCODE_DEST_FORMAT = '.mp4'
# Files we don't want to copy under any condition
SKIP_FILES = ['.DS_Store', '._.DS_Store']
MAX_PROCESSES = 7
class File(object):
def __init__(self, src_file, dest_file):
self.src_file = src_file
self.dest_file = dest_file
# Create directory if it does not exist
self.create_dir(os.path.dirname(self.dest_file))
def __repr__(self):
return '%s.%s(src_file=%s, dest_file=%s)' %(self.__class__.__module__,
self.__class__.__name__, self.src_file, self.dest_file)
def create_dir(self, directory):
""" Create directories for a given path (if it does not exist). """
if not os.path.isdir(directory):
logging.info("Creating directory: %s", directory)
os.makedirs(directory)
class AudioFile(File):
""" Transcodes the audio file and copies the metadata """
def __init__(self, src_file, dest_file):
super(AudioFile, self).__init__(src_file, dest_file)
# Transcode the audio file
transcode_returncode = self.transcode(src_file, dest_file)
if transcode_returncode == 0:
logging.info("Transcoded %s to %s", src_file, dest_file)
else:
logging.error("Failed transcodeing %s to %s", src_file, dest_file)
# Copy metadate to transcoded file
if transcode_returncode == 0:
self.copy_metadata(src_file, dest_file)
logging.info("Copied metadata from %s to %s", src_file, dest_file)
def transcode(self, src_file, dest_file):
# flac --totally-silent -dc <filename>
# faac -w -o "blah.mp4" -q 250 -
logging.debug("Transcoding %s to %s", src_file, dest_file)
#flac = subprocess.Popen(['flac', '-dcs', src_file],
# stdout=subprocess.PIPE)
#mp4 = subprocess.Popen(['faac', '-q 300', '-w', '-o', dest_file, '-'],
# stdin=flac.stdout)
flac = subprocess.Popen(['flac', '-d', src_file, '-o'
'%s.wav.temp' %(dest_file)])
if flac.wait() != 0:
return 1
mp4 = subprocess.Popen(['faac', '-q 300', '-w', '-o', dest_file,
'%s.wav.temp' %(dest_file)])
returncode = mp4.wait()
os.remove('%s.wav.temp' %(dest_file))
return returncode
def copy_metadata(self, src_file, dest_file):
src = MediaFile(src_file)
dest = MediaFile(dest_file)
# The attributes we want to copy if they exist in the src_file
attributes = ['acoustid_fingerprint', 'acoustid_id', 'album',
'albumartist', 'albumartist_sort', 'albumdisambig',
'albumstatus', 'albumtype', 'art', 'artist',
'artist_sort', 'asin', 'bpm', 'catalognum',
'comments', 'comp', 'composer', 'country', 'date',
'day', 'disc', 'disctitle', 'disctotal'
'genre', 'grouping', 'label', 'language', 'lyrics',
'mb_albumartistid', 'mb_albumid', 'mb_artistid',
'mb_releasegroupid', 'mb_trackid', 'month',
'rg_album_gain', 'rg_album_peak', 'rg_track_gain',
'rg_track_peak', 'title', 'track', 'tracktotal',
'year']
for attribute in attributes:
# Get the value of the attribute from the src_file if it exists
try:
value = src.__getattribute__(attribute)
logging.debug(u'Metadata field \'%s\' for %s has values \'%s\'',
attribute, src_file, value)
except AttributeError:
logging.debug('Metadata field \'%s\' for %s does not exist',
attribute, src_file)
else:
# Do nothing if the value is empty
if value:
dest.__setattr__(attribute, value)
dest.save()
class OtherFile(File):
""" Handles all files that are not audio (covers, nfo's, etc.) """
def __init__(self, src_file, dest_file):
super(OtherFile, self).__init__(src_file, dest_file)
# Copy the file
self.copy_file(self.src_file, self.dest_file)
def copy_file(self, src, dest):
logging.info("Copying file %s to %s", src, dest)
shutil.copyfile(src, dest)
class TranscodeFinder(object):
""" Finds the files that need to be transcoded or copied and creates the
directory structure. """
def __init__(self, src_dir, src_exts, skip_files, dest_dir, dest_ext):
self.src_dir = src_dir
self.src_exts = src_exts
self.skip_files = skip_files
self.dest_dir = dest_dir
self.dest_ext = dest_ext
self.tasks = self.get_tasks(src_dir, skip_files, dest_dir)
def get_tasks(self, src_dir, skip_files, dest_dir):
""" Determine which files need to moved and/or transcoded. """
source_files = self.walk_directories(src_dir)
files = []
for src_file in source_files:
# If this file appears in skip_files stop processing it
if os.path.split(src_file)[1] in skip_files:
continue
src_file_rel = os.path.relpath(src_file, self.src_dir)
src_base, src_ext = os.path.splitext(src_file_rel)
if src_ext in self.src_exts:
dest_relpath = src_base + self.dest_ext
else:
dest_relpath = src_file_rel
dest_file = os.path.join(self.dest_dir, dest_relpath)
if self.requires_processing(src_file, dest_file):
if src_ext in self.src_exts:
files.append({
'handler': AudioFile,
'args': {'src_file': src_file, 'dest_file': dest_file}
})
else:
files.append({
'handler': OtherFile,
'args': {'src_file': src_file, 'dest_file': dest_file}
})
logging.info('Found %s files in src_dir that need to be processed.'
%(len(files)))
return files
def walk_directories(self, directory):
""" Recursively walk a directory, yield absolute paths to files. """
for root, dirs, files in os.walk(directory):
for basename in files:
yield os.path.join(root, basename)
def requires_processing(self, src_file, dest_file):
""" Returns True if dest_file does not exist or is older than
src_file, else returens False. """
logging.debug('Checking if \'%s\' needs to be processed.' %(dest_file))
if os.path.isfile(dest_file):
# dest_file exist, check if it is newer than src_file
src_time = os.path.getmtime(src_file)
dest_time = os.path.getmtime(dest_file)
if src_time > dest_time:
logging.debug('\'%s\' does exist but source (\'%s\') is more'\
'recent.' %(dest_file, dest_file))
return True
else:
logging.debug('\'%s\' already exists.' %(dest_file))
return False
else:
# dest_file does not exist and thus requires processing
logging.debug('\'%s\' does not yet exist.' %(dest_file))
return True
if __name__ == '__main__':
transcode = TranscodeFinder('/data/audio/music_lossless/',
TRANSCODE_SRC_FORMATS, SKIP_FILES,
'/home/justin/tools/transfercoder_test/mp4/', TRANSCODE_DEST_FORMAT)
#for task in transcode.tasks:
# task['handler'](**task['args'])
pool = multiprocessing.Pool(processes=MAX_PROCESSES)
for task in transcode.tasks:
print task
pool.apply_async(func=task['handler'], kwds=task['args'])
pool.close()
pool.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment