Last active
May 14, 2021 22:12
-
-
Save elijahr/ceb725ede4d442f327f2ef355f8ef62a to your computer and use it in GitHub Desktop.
scripts for copying folders of multiple audio file types to mp3 playlists. so for instance, if you have a large library of lossless (WAV, FLAC, etc) files and want to copy them to a portable mp3 player with limited space.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import asyncio | |
import multiprocessing | |
import mimetypes | |
import os | |
import shlex | |
import shutil | |
import sys | |
from contextlib import contextmanager | |
from queue import Queue | |
WORKER_COUNT = multiprocessing.cpu_count() | |
parser = argparse.ArgumentParser( | |
description="Convert to mp3 and make m3u playlists", | |
) | |
parser.add_argument( | |
"source", | |
metavar="dirs", | |
nargs="*", | |
default=[os.path.abspath(".")], | |
help="source directory", | |
) | |
parser.add_argument( | |
"--dest", | |
default="/Users/elijahrutschman/Desktop/trax-mp3", | |
help="destination directory", | |
) | |
args = parser.parse_args() | |
@contextmanager | |
def chdir(path): | |
prev_cwd = os.getcwd() | |
os.chdir(path) | |
try: | |
yield | |
finally: | |
os.chdir(prev_cwd) | |
class Track: | |
def __init__(self, source): | |
self.source = source | |
@property | |
def dest(self): | |
return os.path.join( | |
args.dest, | |
os.path.basename(os.path.dirname(self.source)), | |
f"{os.path.splitext(os.path.basename(self.source))[0]}.mp3", | |
) | |
async def process(self): | |
if os.path.exists(self.dest): | |
return | |
else: | |
try: | |
os.makedirs(os.path.dirname(self.dest)) | |
except FileExistsError: | |
pass | |
if self.source.endswith("mp3"): | |
shutil.copyfile(self.source, self.dest) | |
else: | |
cmd = shlex.join( | |
[ | |
"ffmpeg", | |
"-i", | |
self.source, | |
"-codec:a", | |
"libmp3lame", | |
"-qscale:a", | |
"0", | |
"-vsync", | |
"2", | |
self.dest, | |
] | |
) | |
proc = await asyncio.create_subprocess_shell(cmd) | |
await proc.communicate() | |
class Playlist: | |
def __init__(self, source): | |
self.source = os.path.abspath(source) | |
@property | |
def dest(self): | |
return os.path.join( | |
args.dest, | |
os.path.basename(self.source), | |
) | |
def tracks(self): | |
for (dirpath, dirnames, filenames) in os.walk(self.source): | |
for filename in filenames: | |
if filename.startswith("._"): | |
# weird hidden files, no thanks | |
continue | |
mtype = mimetypes.guess_type(filename)[0] | |
if ( | |
mtype | |
and mtype.startswith("audio/") | |
and not filename.endswith(".m3u") | |
): | |
yield Track(os.path.join(dirpath, filename)) | |
class QueueProcessor: | |
def playlists(self): | |
for source in args.source: | |
for (dirpath, dirnames, filenames) in os.walk(source): | |
playlist = Playlist(dirpath) | |
if any(playlist.tracks()): | |
yield playlist | |
async def enqueue(self, playlist): | |
for track in playlist.tracks(): | |
await self.queue.put(track) | |
async def worker(self): | |
while True: | |
track = await self.queue.get() | |
await track.process() | |
self.queue.task_done() | |
async def main(self): | |
self.queue = asyncio.Queue(maxsize=WORKER_COUNT) | |
# Start tasks to process queue | |
tasks = {asyncio.create_task(self.worker()) for i in range(WORKER_COUNT)} | |
# Enqueue all files for processing | |
for playlist in self.playlists(): | |
await self.enqueue(playlist) | |
# Wait for queue to be empty | |
await self.queue.join() | |
# Cancel our worker tasks. | |
for task in tasks: | |
task.cancel() | |
# Wait until all worker tasks are cancelled. | |
await asyncio.gather( | |
*tasks, | |
return_exceptions=True, | |
) | |
# Create m3u | |
for playlist in self.playlists(): | |
with chdir(playlist.dest): | |
proc = await asyncio.create_subprocess_shell("make-m3u .") | |
await proc.communicate() | |
qp = QueueProcessor() | |
asyncio.run(qp.main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" Creates an m3u playlist with relative paths inside. | |
Playlist is created inside the dir supplied. | |
All media files inside the root dir are sorted using 'natural' sort. | |
""" | |
import argparse | |
import os | |
def generate_playlist_info(root_dir_path): | |
for (dirpath, _dirnames, filenames) in os.walk(root_dir_path): | |
media_files = [] | |
for filename in filenames: | |
if filename.startswith('.'): | |
continue | |
extension = os.path.splitext(filename)[1].lower() | |
if extension in ['.mp4', '.mp3', '.mkv', '.aac', '.aiff', '.wav', '.flac', '.aif']: | |
media_files.append(os.path.basename(filename)) | |
if media_files: | |
yield os.path.join(dirpath, os.path.basename(os.path.abspath(dirpath))+ '.m3u'), media_files | |
def create_playlist(playlist_path, filenames): | |
print() | |
print(' >>> ' + playlist_path + ' >>> ') | |
with open(playlist_path, 'w', encoding='utf-8') as f: | |
f.write('#EXTM3U' + '\r\n') | |
for filename in filenames: | |
f.write(filename + '\r\n') | |
print(' ' + filename) | |
print(' <<< ' + playlist_path + ' <<< ') | |
print() | |
def parse_arguments(): | |
""" Parses the arguments | |
directory is required argument | |
playlist name defaults to playlist.m3u | |
sort_by_files defaults to False""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("directory", help="root directory") | |
parser.add_argument('--lossless-to-flac', action="store_true", help='convert lossless files (wav, aiff, etc) to flac') | |
return parser.parse_args() | |
def main(): | |
""" Entry point function """ | |
args = parse_arguments() | |
for m3u, filenames in generate_playlist_info(args.directory): | |
create_playlist(m3u, filenames) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment