Skip to content

Instantly share code, notes, and snippets.

@hemebond
Last active April 15, 2019 21:29
Show Gist options
  • Save hemebond/1501513 to your computer and use it in GitHub Desktop.
Save hemebond/1501513 to your computer and use it in GitHub Desktop.
Synchronise a directory with an .M3U playlist
#!/usr/bin/env python3
import os
import subprocess
import shutil
from time import time
import argparse
import codecs
import unicodedata
from tempfile import gettempdir
import configparser
VERBOSE = False
CONVERTER = 'ffmpeg'
'''
Example output paths:
- /run/user/1000/gvfs/afc:host=b896e7ad5dfe4e42517e50d3d08b79e58cd902bb,port=3/com.MyApp.cloudmusic$
'''
def which(program):
def is_exe(file_path):
return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
file_path, file_name = os.path.split(program)
if file_path:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def flac_to_ogg(flac_file):
"""
Converts a flac file to an ogg file
maintaining the existing metadata
Uses avconv (replacement for ffmpeg)
"""
ffmpeg_exe = which(CONVERTER)
codec = "libvorbis"
extension = ".ogg"
quality = "3"
if ffmpeg_exe is not None:
out = None if VERBOSE else open(os.devnull, "wb")
# Generate a random name for the temporary file
temp_file = os.path.join(gettempdir(), "%.7f%s" % (time(), extension))
subprocess.call(
[
ffmpeg_exe,
"-i", flac_file,
"-codec:a", codec,
"-qscale:a", quality,
temp_file
],
stdout=out,
stderr=out
)
if not VERBOSE:
out.close()
else:
raise RuntimeError("Executable %s not found" % CONVERTER)
return temp_file
def flac_to_mp3(flac_file):
"""
Converts a flac file to an ogg file
maintaining the existing metadata
Uses avconv (replacement for ffmpeg)
"""
ffmpeg_exe = which(CONVERTER)
extension = ".mp3"
if ffmpeg_exe is not None:
out = None if VERBOSE else open(os.devnull, "wb")
# Generate a random name for the temporary file
temp_file = os.path.join(gettempdir(), "%.7f%s" % (time(), extension))
subprocess.call(
[
ffmpeg_exe,
"-i", flac_file,
temp_file
],
stdout=out,
stderr=out
)
if not VERBOSE:
out.close()
else:
raise RuntimeError("Executable %s not found" % CONVERTER)
return temp_file
def ogg_to_mp3(flac_file):
"""
Converts a flac file to an ogg file
maintaining the existing metadata
Uses avconv (replacement for ffmpeg)
"""
ffmpeg_exe = which(CONVERTER)
extension = ".mp3"
if ffmpeg_exe is not None:
out = None if VERBOSE else open(os.devnull, "wb")
# Generate a random name for the temporary file
temp_file = os.path.join(gettempdir(), "%.7f%s" % (time(), extension))
subprocess.call(
[
ffmpeg_exe,
"-i", flac_file,
temp_file
],
stdout=out,
stderr=out
)
if not VERBOSE:
out.close()
else:
raise RuntimeError("Executable %s not found" % CONVERTER)
return temp_file
def ape_to_mp3(flac_file):
"""
Converts a flac file to an ogg file
maintaining the existing metadata
Uses avconv (replacement for ffmpeg)
"""
ffmpeg_exe = which(CONVERTER)
extension = ".mp3"
if ffmpeg_exe is not None:
out = None if VERBOSE else open(os.devnull, "wb")
# Generate a random name for the temporary file
temp_file = os.path.join(gettempdir(), "%.7f%s" % (time(), extension))
subprocess.call(
[
ffmpeg_exe,
"-i", flac_file,
temp_file
],
stdout=out,
stderr=out
)
if not VERBOSE:
out.close()
else:
raise RuntimeError("Executable %s not found" % CONVERTER)
return temp_file
def unicode_file(string):
return codecs.open(string, 'r', 'utf-8')
def audio_file(path, name, extension):
return {
'path': path,
'name': name,
'ext': extension
}
def make_abs_path(root, song):
components = [song['path'], song['name'] + song['ext']]
if not os.path.isabs(song['path']):
components.insert(0, root)
return os.path.join(*components)
def parse_m3u():
pass
def parse_pls():
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Synchronise a directory with a playlist"
)
parser.add_argument(
"-p",
"--playlist",
metavar="FILE",
help="Playlist file",
type=unicode_file,
required=True
)
parser.add_argument(
"-o",
"--output",
metavar="DIR",
help="Output or destination directory",
required=True
)
parser.add_argument(
"-v",
"--verbose",
help="increase output verbosity",
action="store_true"
)
parser.add_argument(
"-d",
"--delete",
help="delete files from destination directory that aren't in the playlist",
action="store_true"
)
args = parser.parse_args()
VERBOSE = args.verbose
DELETE = args.delete
# All relative file paths in the playlist
# will be relative to the playlist
in_dir = os.path.dirname(os.path.abspath(args.playlist.name))
if VERBOSE:
print("Source: " + in_dir)
out_dir = os.path.abspath(args.output)
if os.path.exists(out_dir):
if VERBOSE:
print("Destination: " + out_dir)
else:
raise IOError("Can't find output directory %s" % out_dir)
# print(os.path.splitext(args.playlist.name))
# print(args.playlist.name)
playlist_name, playlist_extension = os.path.splitext(args.playlist.name)
if playlist_extension == '.pls':
config = configparser.ConfigParser()
config.read_file(args.playlist)
print(config['playlist'])
num_of_songs = config['playlist'].getint('numberofentries')
songs = []
for i in range(1, num_of_songs + 1):
songs.append(config['playlist']['file' + str(i)])
elif playlist_extension == '.m3u':
songs = []
for line in args.playlist:
if line[0] != '#' and len(line) > 4:
songs.append(line.strip())
# print(songs)
args.playlist.close()
#cover_art_directory = os.path.expanduser('~/.cache/rhythmbox/album-art')
# Get the list of files from the playlist file
pFiles = []
for file_path in songs:
if os.path.isabs(file_path):
abs_file_path = file_path
else:
abs_file_path = os.path.join(in_dir, file_path)
# Check that all files in the playlist actually exist
if os.path.exists(abs_file_path):
# Split the file_path into its components
(path, file_name) = os.path.split(file_path)
(name, extension) = os.path.splitext(file_name)
pFiles.append(
audio_file(path, name, extension)
)
else:
raise IOError("Could not find file %s" % abs_file_path)
# print(pFiles)
# exit()
# Get a list of all the files currently in the destination directory
dFiles = []
for file_path, dirs, files in os.walk(out_dir):
for file_name in files:
name, extension = os.path.splitext(file_name)
# If there are files in the root of the out_dir
# then there is no actual path
if file_path != out_dir:
path = file_path.replace(out_dir + os.sep, "")
else:
path = u""
dFiles.append(
audio_file(
unicodedata.normalize('NFC', path),
unicodedata.normalize('NFC', name),
extension
)
)
# Check to see if the file in the destination directory is in the playlist
# if it is, remove it from the list of playlist files (so we don't have to process it again later on)
# if not, delete the file from the destination directory
playlist_length = len(pFiles)
if playlist_length > 0:
for dFile in dFiles:
for i in range(playlist_length):
pFile = pFiles[i]
dFilename = os.path.join(dFile['path'], dFile['name'])
if os.path.isabs(pFile['path']):
pFilename = os.path.join(pFile['path'][1:], pFile['name'])
else:
pFilename = os.path.join(pFile['path'], pFile['name'])
if dFilename == pFilename:
# File found in playlist
del pFiles[i]
playlist_length -= 1
break
elif i == (playlist_length - 1):
# Searched the whole playlist and not found the file
if DELETE:
if VERBOSE:
print("Remove: %s" % make_abs_path(out_dir, dFile))
try:
os.remove(make_abs_path(out_dir, dFile))
except:
pass
else:
if VERBOSE:
print("Would remove: %s" % make_abs_path(out_dir, dFile))
# Any files left in pFiles need to be copied to out_dir
for pFile in pFiles:
# Turn an absolute path into a relative path for the destination
path = pFile['path']
if os.path.isabs(path):
path = path[1:]
# Create the directory tree in the destination directory
file_dir = os.path.join(out_dir, path)
if not os.path.isdir(file_dir):
os.makedirs(file_dir)
# Get the absolute path to the source file
src = make_abs_path(in_dir, pFile)
# Do we need to convert the file?
convert = False
if pFile['ext'].lower() == ".flac":
# Convert the .flac file to .mp3 and
# use the new temp file as the src
convert = True
src = flac_to_mp3(src)
# if pFile['ext'].lower() == ".ogg":
# convert = True
# src = ogg_to_mp3(src)
if pFile['ext'].lower() == ".ape":
convert = True
src = ape_to_mp3(src)
# Get the extension of the src file
extension = os.path.splitext(src)[1]
# Set the destination file path
dst = os.path.join(out_dir, path, pFile['name'] + extension)
# Copy the file across
if VERBOSE:
print("Copy: %s -> %s" % (src, dst))
try:
shutil.copyfile(src, dst)
except FileNotFoundError as e:
if convert:
print("Could not convert file \"%s\"" % dst)
else:
print("No such file or directory \"%s\"" % src)
pass
if convert:
# Delete the temporary file
if VERBOSE:
print("Cleanup: %s" % src)
try:
os.remove(src)
except FileNotFoundError as e:
# Something went wrong with the conversion
print("No such file or directory \"%s\"" % src)
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment