Skip to content

Instantly share code, notes, and snippets.

@itzexor
Last active June 17, 2024 21:52
Show Gist options
  • Save itzexor/46a6224fba7a430344799842c3bc79b6 to your computer and use it in GitHub Desktop.
Save itzexor/46a6224fba7a430344799842c3bc79b6 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
from dataclasses import dataclass
from datetime import datetime
from glob import glob
from shlex import quote
from shutil import copyfile
from subprocess import Popen, PIPE, DEVNULL, CalledProcessError
from concurrent.futures import ThreadPoolExecutor, Future
from argparse import ArgumentParser
from signal import signal, SIGINT
from enum import Enum, auto
import threading
from urllib.request import urlopen
from html.parser import HTMLParser
import html
import json
import os
import sys
import argparse
import re
import traceback
import time
DEFAULT_THREAD_LIMIT = 4
'''Default max simultaneous jobs'''
DELIM_NAME = ', '
'''Name-type metadata field delimiter'''
DELIM_GENRE = '; '
'''Genre-type metadata field delimiter'''
#some specific jank to remove
NAMES_REMOVE = (
'The Great Courses'
)
'''Exact name matches to remove entirely'''
NAMES_REPLACE = (
(' - introductions', ''),
('James S.A. Corey', 'James S. A. Corey'),
('eden Hudson', 'Eden Hudson')
)
'''(string, replace) pairs to replace within names'''
FFMPEG_CMD = ('ffmpeg', '-loglevel', 'error')
DEFAULT_POPEN_KWARGS = {
'bufsize': 0,
'pipesize': 1048576,
'stderr': DEVNULL,
'stdin': DEVNULL,
'stdout': DEVNULL
}
POPEN_PIPE_CHUNK_SIZE = 65535
'''In-app read chunk size from decoder pipe'''
POLLING_INTERVAL = 1/10
'''General polling interval in seconds'''
PROGRESS_INTERVAL = 1
'''Progress printing interval in seconds'''
REGEX_MATCH_HTML = r'<\/?[\w\s]*>|<.+[\W]>'
FILENAME_CHAR_MAP = (
('<', '﹤'),
('>', '﹥'),
(':', 'ː'),
('"', '“'),
('/', '⁄'),
('|', '⼁'),
('?', '﹖'),
('*', '﹡'),
('\\', '∖')
)
PRINT_TIME_FMT = '%H:%m:%S'
'''App().print() timestamping format'''
FFMPEG_CHAPTER_FMT = \
'''[CHAPTER]
TIMEBASE=1/1000
START={start}
END={end}
TITLE={title}'''
'''FFMPEG FFMETADATA chapter template'''
MATROSKA_TAGS_XML_FMT = \
'''<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Tags SYSTEM "matroskatags.dtd">
<Tags>
{tags}
</Tags>
'''
'''Matroska tags file template'''
MATROSKA_TAG_SIMPLE_FMT = \
''' <Tag>
<Simple>
<Name>{key}</Name>
<String>{value}</String>
</Simple>
</Tag>'''
'''Matroska simple tag template'''
MATROSKA_CHAPTERS_XML_FMT = \
'''<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Chapters SYSTEM "matroskachapters.dtd">
<Chapters>
<EditionEntry>
{atoms}
</EditionEntry>
</Chapters>
'''
'''Matroska chapters file template'''
MATROSKA_CHAPTER_ATOM_FMT = \
''' <ChapterAtom>
<ChapterUID>{uid}</ChapterUID>
<ChapterTimeStart>{start}</ChapterTimeStart>
<ChapterTimeEnd>{end}</ChapterTimeEnd>
<ChapterDisplay>
<ChapterString>{title}</ChapterString>
</ChapterDisplay>
</ChapterAtom>'''
'''Matroska chapter atom template'''
class OperationCancelled(Exception):
'''An exception raised when an operation was interrupted by user cancellation'''
pass
class MetadataNotLoaded(Exception):
pass
class StrEnum(Enum):
def __str__(self):
return self.value
@staticmethod
def _generate_next_value_(name, start, count, last_values):
#ENUM_MEMBER.value = "enum-member"
return name.lower().replace('_', '-')
# https://stackoverflow.com/a/60750535
class EnumAction(argparse.Action):
"""
Argparse action for handling Enums
"""
def __init__(self, **kwargs):
# Pop off the type value
enum_type = kwargs.pop('type', None)
# Ensure an Enum subclass is provided
if enum_type is None:
raise ValueError('type must be assigned an Enum when using EnumAction')
if not issubclass(enum_type, Enum):
raise TypeError('type must be an Enum when using EnumAction')
# Generate choices from the Enum
kwargs.setdefault('choices', tuple(e.value for e in enum_type))
super(EnumAction, self).__init__(**kwargs)
self._enum = enum_type
def __call__(self, parser, namespace, values, option_string=None):
# Convert value back into an Enum
value = self._enum(values)
setattr(namespace, self.dest, value)
class Container(StrEnum):
'''Output container format'''
MP4 = auto()
'''opus in mp4 container, "m4b" file extension'''
OGG = auto()
'''opus in ogg container, "opus" file extension'''
WEBM = auto()
'''opus in webm container, "webm" file extension'''
class Quality(StrEnum):
'''Output opus quality setting'''
MONO_VOICE = auto()
'''mono 32k voice mode'''
STEREO_VOICE = auto()
'''stereo 48k voice mode'''
STEREO = auto()
'''stereo 64k auto'''
@dataclass
class Chapter:
'''Represents a single chapter within an AaxcFile()'''
index: int
'''Chapter index'''
title: str
'''Chapter title'''
duration: int
'''Chapter duration'''
input_offset: int
'''Start offset relative to AaxcFile() input file'''
output_offset: int
'''Start offset relative to AaxcFile() output file'''
class AudnexusSummaryFormatter(HTMLParser):
'''
Converts an audnexus/audible summary from html to plain text with stylized unicode
character based formatting.
'''
UNI_FORMAT_MAP = {
'keys': 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',
'value_sets': {
'b': '𝐚𝐛𝐜𝐝𝐞𝐟𝐠𝐡𝐢𝐣𝐤𝐥𝐦𝐧𝐨𝐩𝐪𝐫𝐬𝐭𝐮𝐯𝐰𝐱𝐲𝐳𝐀𝐁𝐂𝐃𝐄𝐅𝐆𝐇𝐈𝐉𝐊𝐋𝐌𝐍𝐎𝐏𝐐𝐑𝐒𝐓𝐔𝐕𝐖𝐗𝐘𝐙𝟎𝟏𝟐𝟑𝟒𝟓𝟔𝟕𝟖𝟗',
'i': '𝑎𝑏𝑐𝑑𝑒𝑓𝑔ℎ𝑖𝑗𝑘𝑙𝑚𝑛𝑜𝑝𝑞𝑟𝑠𝑡𝑢𝑣𝑤𝑥𝑦𝑧𝐴𝐵𝐶𝐷𝐸𝐹𝐺𝐻𝐼𝐽𝐾𝐿𝑀𝑁𝑂𝑃𝑄𝑅𝑆𝑇𝑈𝑉𝑊𝑋𝑌𝑍0123456789',
'bi': '𝒂𝒃𝒄𝒅𝒆𝒇𝒈𝒉𝒊𝒋𝒌𝒍𝒎𝒏𝒐𝒑𝒒𝒓𝒔𝒕𝒖𝒗𝒘𝒙𝒚𝒛𝑨𝑩𝑪𝑫𝑬𝑭𝑮𝑯𝑰𝑱𝑲𝑳𝑴𝑵𝑶𝑷𝑸𝑹𝑺𝑻𝑼𝑽𝑾𝑿𝒀𝒁𝟎𝟏𝟐𝟑𝟒𝟓𝟔𝟕𝟖𝟗'
}
}
'''HTML-to-Unicode map'''
def __init__(self, *, convert_charrefs: bool = True) -> None:
self.states = {}
self.buffer = ''
self.needs_newlines = False
super().__init__(convert_charrefs=convert_charrefs)
def get_state(self, key):
return not not self.states[key] if key in self.states else False
def set_state(self, key, state):
if key in self.states:
if state:
self.states[key] += 1
elif self.states[key]:
self.states[key] -= 1
else:
self.states[key] = 1 if state else 0
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
self.set_state(tag, True)
def handle_endtag(self, tag: str) -> None:
self.set_state(tag, False)
if tag == 'p' and not self.get_state('p'):
self.needs_newlines = True
def handle_data(self, data: str) -> None:
if self.needs_newlines and self.get_state('p'):
self.buffer += '\n\n'
self.needs_newlines = False
style = 'b' if self.get_state('b') else ''
if self.get_state('i'):
style = f'{style}i'
if style:
for char in data:
if (i := AudnexusSummaryFormatter.UNI_FORMAT_MAP['keys'].find(char)) != -1:
self.buffer += AudnexusSummaryFormatter.UNI_FORMAT_MAP['value_sets'][style][i]
else:
self.buffer += char
else:
self.buffer += data
def process(self, input=''):
self.buffer = ''
self.states = {}
self.feed(input)
return self.buffer
class AaxcFile:
def __init__(self, aaxc_path: str, combine_chapter_names = False, use_metadata_cache = True) -> None:
self._use_cache = use_metadata_cache
self._combine_chapter_names = combine_chapter_names
self._metadata_loaded = False
self._location, self._filename = os.path.split(aaxc_path)
self._filename, _ = os.path.splitext(self._filename)
self.input_file = aaxc_path
'''input aaxc file'''
self.input_sample_rate: int | None
'''aaxc sample rate'''
self.input_bit_rate: int
'''aaxc bit rate'''
self.input_duration: int
'''input file duration'''
self.input_start_offset: int
'''input file audible intro length'''
self.input_end_offset: int
'''input file audible outro length'''
self.key: str
'''aaxc encryption key'''
self.iv: str
'''aaxc encryption initialization vector'''
self.asin: str
'''audible asin'''
self.input_base_filename: str
'''common filename prefix between aaxc, voucher, chapters, and cover files'''
self.cover_file: str
'''cover jpg file associated with this aaxc, if any'''
self.metadata: dict[str, str]
'''dict containing metadata tags for the output file(s), only contains 'asin' until metadata import'''
self.output_duration = None
'''output file duration'''
self.output_filename = None
'''output filename without an extension, only valid after metadata import'''
self.output_directory = None
'''full destination output directory, only valid after metadata import'''
self.chapters: tuple[Chapter, ...]
'''a tuple containing Chapter() entries'''
def _load_initial(self):
with open(f'{self._location}/{self._filename}.voucher', 'r') as vf:
voucher = json.load(vf)['content_license']
self.key = clean_text(voucher['license_response']['key'])
self.iv = clean_text(voucher['license_response']['iv'])
content_fmt = clean_text(voucher['content_metadata']['content_reference']['content_format'])
filename_suffix = f"-{content_fmt}"
_, sr, br = filename_suffix.split('_')
self.input_base_filename = f'{self._location}/{self._filename.replace(filename_suffix, "")}'
if not (pic := glob(f'{self.input_base_filename}_(*).jpg')[-1]):
pic = None
self.input_base_filename = f'{self._location}/{self._filename.replace(filename_suffix, "")}'
self.input_sample_rate = 22050 if sr == '22' else 44100 if sr == '44' else None
self.input_bit_rate = int(br)
self.asin = clean_text(voucher['asin'])
self.metadata = { 'asin': self.asin }
self.cover_file = pic
def _load_chapters(self):
with open(f'{self.input_base_filename}-chapters.json', 'r') as cf:
chapters_json = json.load(cf)['content_metadata']['chapter_info']
self.input_start_offset = int(chapters_json['brandIntroDurationMs'])
self.input_end_offset = int(chapters_json['brandOutroDurationMs'])
self.input_duration = int(chapters_json['runtime_length_ms'])
self.output_duration = self.input_duration - self.input_start_offset - self.input_end_offset
# for chapters mapping source to output:
# -first chapter must be offset by start offset
# for chapters referencing output file:
# -first chapter must start at 0
# -all other chapters must be offset by start offset
# both:
# -first chapter duration must be shortened by start offset
# -last chapter duration must be shortened by end trim
def flatten(node: dict, prefix: str = '', chapter_list: list[Chapter] = []):
#FIXME: matroska supports this natively
#Handles recursively traversing the chapter tree when each book has it's own chapter heading. Produces
# output like "Book 2: Chapter 3" instead of having multiple "Chapter 3" in a single file if use_combined_chapter_names
# is True. Multi-book files don't always have nested or even per-book chapters.
for item in node:
index = len(chapter_list)
title = f"{prefix}{clean_text(item['title'])}"
offset = int(item['start_offset_ms'])
duration = int(item['length_ms'])
if index == 0:
duration -= self.input_start_offset
chapter_list.append(Chapter(index, title, duration, self.input_start_offset, 0))
else:
chapter_list.append(Chapter(index, title, duration, offset, offset - self.input_start_offset))
if 'chapters' in item:
flatten(item['chapters'], f'{title}: ' if self._combine_chapter_names else '', chapter_list)
return chapter_list
chapter_list = flatten(chapters_json['chapters'])
chapter_list[-1].duration -= self.input_end_offset
return tuple(chapter_list)
def _load_metadata(self):
m_filename = f'{self.input_base_filename}-metadata.json'
try:
with open(m_filename, 'r') as meta_file:
meta = json.load(meta_file)
except FileNotFoundError:
with urlopen(f'https://api.audnex.us/books/{self.asin}') as book_json:
json_s = book_json.read().decode('UTF-8')
meta = json.loads(json_s)
with open(m_filename, 'w') as m_file:
m_file.write(json_s)
# avoid hammering audnexus too hard
# roughly limits to n threads per second requests
# in worst case
time.sleep(1/3)
# sort mononyms last to work around "lastname, firstname" detection in abs
authors, mononym_authors = [], []
for author in meta['authors']:
name = clean_text(author['name'])
if name in NAMES_REMOVE:
continue
for args in NAMES_REPLACE:
name = name.replace(*args)
if '\u0020' in name:
authors.append(name)
else:
mononym_authors.append(name)
narrators = [clean_text(n['name']) for n in meta['narrators']]
narrators.sort(key=lambda n: '\u0020' not in n)
# 'tags' is not imported by abs, but we don't want to combine it with genres
genre, tags = [], []
for g in meta['genres']:
name = clean_text(g['name'])
if g['type'] == 'genre':
genre.append(name)
else:
tags.append(name)
authors = DELIM_NAME.join(authors + mononym_authors)
narrators = DELIM_NAME.join(narrators)
genre = DELIM_GENRE.join(genre)
tags = DELIM_GENRE.join(tags)
# restore some of the original formatting - not perfect by any means
desc = format_audnexus_summary(meta['summary'])
desc = clean_text(desc, remove_newlines=False)
self.metadata.update({
'language': clean_text(meta['language']),
'artist': authors,
'composer': narrators,
'genre': genre,
'tags': tags,
'date': clean_text(meta['releaseDate'][:10]),
'title': clean_text(meta['title']),
'description': desc,
'publisher': clean_text(meta['publisherName'])
})
has_series = False
if 'seriesPrimary' in meta and 'position' in meta['seriesPrimary']:
if 'asin' in meta['seriesPrimary']:
self.metadata['series-asin'] = clean_text(meta['seriesPrimary']['asin'])
self.metadata['series'] = clean_text(meta['seriesPrimary']['name'], sub_comma=True)
self.metadata['series-part'] = clean_text(meta['seriesPrimary']['position'])
has_series = True
if 'subtitle' in meta:
self.metadata['subtitle'] = clean_text(meta['subtitle'])
#output patterns:
#normal: author/title/title.extension
# ex: Bob Bobbyson/Bobbin' It Up/Bobbin' It Up.opus
#series: author/series/series-part. title/title.extension
# ex: Bob Bobbyson/Bob's Adventures/2. Bobbin' It Up/Bobbin' It Up.opus
filename_prefix = clean_filename(self.metadata['title'])
if has_series:
series_dir = f"{clean_filename(self.metadata['series'])}/{clean_filename(self.metadata['series-part'])}. "
else:
series_dir = ''
self.output_directory = f'{clean_filename(authors)}/{series_dir}{filename_prefix}'
self.output_filename = filename_prefix
def load(self):
if self._metadata_loaded:
return
self._load_initial()
self.chapters = self._load_chapters()
self._load_metadata()
self._metadata_loaded = True
class AaxcTools:
@staticmethod
def generate_ffmetadata(aaxc: AaxcFile):
ret = [';FFMETADATA1']
for key, value in aaxc.metadata.items():
ret.append(f'{key}={ffmetadata_escape(value)}')
for c in aaxc.chapters:
ret.append(FFMPEG_CHAPTER_FMT.format(
start=c.output_offset,
end=c.output_offset + c.duration,
title=ffmetadata_escape(c.title))
)
return '\n'.join(ret)
@staticmethod
def generate_matroska_metadata(aaxc: AaxcFile) -> tuple[str, str]:
tags = []
chapters = []
for key, value in aaxc.metadata.items():
if key == 'date':
key = 'release_date'
tags.append(MATROSKA_TAG_SIMPLE_FMT.format(key=key, value=html.escape(value)))
for c in aaxc.chapters:
kw = {
'uid': c.index + 1,
'start': ms_to_fftime(c.output_offset),
'end': ms_to_fftime(c.output_offset + c.duration),
'title': html.escape(c.title)
}
chapters.append(MATROSKA_CHAPTER_ATOM_FMT.format(**kw))
tags = MATROSKA_TAGS_XML_FMT.format(tags='\n'.join(tags))
chapters = MATROSKA_CHAPTERS_XML_FMT.format(atoms='\n'.join(chapters))
return (tags, chapters)
@staticmethod
def generate_opusenc_metadata_args(aaxc: AaxcFile) -> list[str]:
ret = []
for key, value in aaxc.metadata.items():
if key in ('title', 'artist', 'genre', 'date'):
ret += (f'--{key}', f'{value}')
else:
ret += ('--comment', f'{key}={value}')
if aaxc.cover_file:
ret += ('--picture', aaxc.cover_file)
for c in aaxc.chapters:
ret += ('--comment', f'CHAPTER{c.index:03d}={ms_to_fftime(c.output_offset)}',
'--comment', f'CHAPTER{c.index:03d}NAME={c.title}')
return ret
@staticmethod
def transcode(aaxc: AaxcFile, output_dir: str, quality: Quality, container: Container, cancel_event: threading.Event, print_fn = None):
if print_fn is None:
def print_fn(*args):
pass
def chk_cancel():
if cancel_event.is_set():
raise OperationCancelled()
aaxc.load()
chk_cancel()
print_fn(f'Transcode started for asin {aaxc.asin}({aaxc.metadata["title"]})')
dir_mode = os.stat(output_dir).st_mode
output_dir = f'{output_dir}/{aaxc.output_directory}/'
os.makedirs(output_dir, mode=dir_mode, exist_ok=True)
output_file = f'{output_dir}{aaxc.output_filename}'
cover_file = f'{output_dir}cover.jpg' #fixme: jpg?
ogg_filename = f'{output_file}.opus'
decoder_args = []
encoder_args = []
temp_files = [(ogg_filename, None)]
remux_cmds = []
embedded_cover = True
match quality:
case Quality.MONO_VOICE:
br = '32'
encoder_args.append('--speech')
decoder_args = ('-ac', '1')
case Quality.STEREO_VOICE:
br = '48'
encoder_args.append('--speech')
case Quality.STEREO:
br = '64'
encoder_args += ('--bitrate', f'{br}k')
match container:
case Container.OGG:
output_file = ogg_filename
temp_files = []
encoder_args += AaxcTools.generate_opusenc_metadata_args(aaxc)
case Container.MP4:
output_file += '.mp4'
ffmetadata_filename = f'{output_dir}ffmetadata'
temp_files.append((ffmetadata_filename, AaxcTools.generate_ffmetadata(aaxc)))
remux_cmds.append((*FFMPEG_CMD, '-i', ogg_filename,
'-i', ffmetadata_filename,
'-map_metadata', '1',
'-movflags', 'use_metadata_tags',
'-movflags', 'faststart',
'-codec', 'copy',
'-f', 'mp4',
output_file))
case Container.WEBM:
output_file += ".webm"
embedded_cover = False
temp_webm = f'{output_file}.tmp'
tags_file = f'{output_dir}tags'
chapter_file = f'{output_dir}chapters'
tags, chapters = AaxcTools.generate_matroska_metadata(aaxc)
temp_files.append((tags_file, tags))
temp_files.append((chapter_file, chapters))
temp_files.append((temp_webm, None))
remux_cmds.append(('mkvmerge', '--output', temp_webm,
'--webm',
'--quiet',
'--global-tags', tags_file,
'--chapters', chapter_file,
ogg_filename))
remux_cmds.append(('mkclean', '--quiet',
'--remux',
'--optimize',
temp_webm,
output_file))
decoder_kwargs = {
'args': (*FFMPEG_CMD, '-audible_key', aaxc.key,
'-audible_iv', aaxc.iv,
'-ss', ms_to_fftime(aaxc.input_start_offset),
'-t', ms_to_fftime(aaxc.output_duration),
'-i', aaxc.input_file,
'-map_metadata', '-1',
*decoder_args,
'-f', 'wav',
'-')
}
encoder_kwargs = {
'args': ('opusenc', '--quiet',
*encoder_args,
'-',
ogg_filename)
}
piped_popen(decoder_kwargs, encoder_kwargs, cancel_event)
chk_cancel()
if aaxc.cover_file and not embedded_cover:
copyfile(aaxc.cover_file, cover_file)
for file, content in temp_files:
if content is None:
continue
chk_cancel()
with open(file, 'w') as f:
f.write(content)
chk_cancel()
for cmd in remux_cmds:
piped_popen({'args': cmd}, cancel_event=cancel_event)
for file, _ in temp_files:
os.remove(file)
print_fn(f'Transcode complete for asin {aaxc.asin}')
# utilities
def format_audnexus_summary(s: str) -> str:
return AudnexusSummaryFormatter().process(s)
def clean_filename(input=''):
out = input
for args in FILENAME_CHAR_MAP:
out = out.replace(*args)
return out
def clean_text(input='', sub_comma=False, remove_newlines=True):
#remove html
out = re.sub(REGEX_MATCH_HTML, '', input)
#remove weird whitespace
pattern = r'\u202F|\u00A0| '
if remove_newlines:
pattern += r'|\n'
out = re.sub(pattern, '\u0020', out).strip()
if sub_comma:
# replace comma with SINGLE LOW-9 QUOTATION MARK to prevent commas in series from being used as an item delimiter
out = out.replace('\u002c', '\u201A')
return out
def ffmetadata_escape(input=''):
'''Escape text to conform to ffmetadata escaping rules'''
for s in (r'\=;#'):
input = input.replace(s, f'\\{s}')
return input.replace('\n', r'\\n')
def ms_to_fftime(milliseconds=0):
'''Return hh:mm:ss.fff format from milliseconds'''
(s, ms) = divmod(milliseconds, 1000)
(m, s) = divmod(s, 60)
(h, m) = divmod(m, 60)
return f'{h:02}:{m:02}:{s:02}.{ms:03d}'
def pluralize(n, singular, plural_suffix='s') -> str:
return f'{n} {singular}{"" if n == 1 else plural_suffix}'
def piped_popen(kwargs_a, kwargs_b=None, cancel_event=None) -> None:
'''
Executes Popen with kwargs_a and polls until cancel_chk function returns True or
the process exits.
If kwargs_b is provided, stdout from a is piped to stdin of b.
Returns:
* None
Raises:
* CalledProcessError on non-zero exit status
* OperationCancelled upon cancel_chk() == True
'''
if kwargs_b:
kwargs_a = DEFAULT_POPEN_KWARGS | kwargs_a
kwargs_b = DEFAULT_POPEN_KWARGS | kwargs_b
kwargs_a['stdout'] = PIPE
kwargs_b['stdin'] = PIPE
with Popen(**kwargs_b) as proc_b:
with Popen(**kwargs_a) as proc_a:
while proc_a.poll() == None:
if cancel_event and cancel_event.is_set():
raise OperationCancelled()
proc_b.stdin.write(proc_a.stdout.read(POPEN_PIPE_CHUNK_SIZE))
if proc_a.returncode != 0:
raise CalledProcessError(proc_a.returncode, kwargs_a['args'])
proc_b.stdin.close()
while proc_b.poll() == None:
if cancel_event and cancel_event.is_set():
raise OperationCancelled()
time.sleep(POLLING_INTERVAL)
if proc_b.returncode != 0:
raise CalledProcessError(proc_b.returncode, kwargs_b['args'])
else:
with Popen(**(DEFAULT_POPEN_KWARGS | kwargs_a)) as process:
while process.poll() == None:
if cancel_event and cancel_event.is_set():
raise OperationCancelled()
time.sleep(POLLING_INTERVAL)
if process.returncode:
raise CalledProcessError(process.returncode, process.args)
class App():
def __init__(self) -> None:
self._print_lock = threading.RLock()
self._cancel_event = threading.Event()
self._printed_progress = False
self._executor = ThreadPoolExecutor()
self._job_stats_lock = threading.Lock()
self._active = 0
self._finished = 0
self._failed = 0
self._queue = []
# linter helpers
self.threads: int
self.container: Container
self.quality: Quality
self.quiet: bool
self.combine_titles: bool
self.inputs: list[str]
self.output: str
# script args setup + parse
parser = ArgumentParser(prog='abtc',
description='transcode aaxcs from audible-cli with audnexus metadata')
parser.add_argument('-t', '--threads',
type=int,
default=DEFAULT_THREAD_LIMIT,
help='max number of processing threads')
parser.add_argument('-c', '--container',
type=Container,
default=Container.WEBM,
action=EnumAction,
help='output file container type')
parser.add_argument('-q', '--quality',
type=Quality,
default=Quality.MONO_VOICE,
action=EnumAction,
help='output file opus quality')
parser.add_argument('-s', '--quiet',
action='store_true',
help='silence output')
parser.add_argument('-C' '--combine-titles',
dest='combine_titles',
action='store_true',
help="combine parent chapter title with child's when nested")
parser.add_argument('output',
help='output directory')
parser.add_argument('inputs',
nargs='+',
help='input file(s) or a single directory')
parser.parse_args(namespace=self)
@property
def cancelled(self):
return self._cancel_event.is_set()
def _job_done_cb(self, future: Future):
with self._job_stats_lock:
self._active -= 1
self._finished += 1
if self.cancelled:
return
exc = future.exception()
msg = None
if exc:
#can't avoid broken pipe error upon interruption during stdio
if isinstance(exc, (OperationCancelled, BrokenPipeError)):
pass
else:
self._failed += 1
if isinstance(exc, CalledProcessError):
cmd = ' '.join(quote(arg) for arg in exc.cmd)
msg = f'Exec failed with code {exc.returncode}: "{cmd}"'
else:
msg = f'Something broke...\n{"".join(traceback.format_exception(exc))}'
elif msg := future.result():
pass
if msg:
self.print(msg)
def cancel(self):
if self.cancelled:
return
self._cancel_event.set()
self._executor.shutdown(wait=False, cancel_futures=True)
def main(self) -> int:
self._start_time = datetime.now()
self.print(f'Started with {pluralize(len(self.inputs), "input")}')
progress_loops = PROGRESS_INTERVAL / POLLING_INTERVAL
i = progress_loops
self._queue = [AaxcFile(i, self.combine_titles) for i in self.inputs]
return_code = 0
while True:
i += 1
if i >= progress_loops:
self.print(progress=True)
i = 0
while self._active < self.threads and len(self._queue):
self._active += 1
tc_args = (self.output, self.quality, self.container, self._cancel_event, self.print)
future = self._executor.submit(AaxcTools.transcode, self._queue.pop(), *tc_args)
future.add_done_callback(self._job_done_cb)
if not self._active and not len(self._queue):
break
if self._cancel_event.wait(POLLING_INTERVAL):
return_code = 1
break
if self._failed:
return_code = 1
end_time = datetime.now()
duration = (end_time - self._start_time).total_seconds()
status_str = f'with {pluralize(self._failed, "failure")}' if self._failed else '(cancelled)' if self.cancelled else 'successfully'
self.print(f'Finished {status_str}, elapsed: {duration:.3f}s')
return return_code
def print(self, *args, progress=False, **kwargs):
if self.quiet:
return
with self._print_lock:
reprint_progress = False
clear_progress = self._printed_progress
if progress:
args = self._gen_progress()
self._printed_progress = True
else:
reprint_progress = self._printed_progress and not self.cancelled
self._printed_progress = False
have_args = len(args) > 0
if have_args:
args = (f'[{datetime.now().strftime(PRINT_TIME_FMT)}]', *args)
if clear_progress:
args = (f'\r\033[1A\033[0K{args[0]}', *args[1:]) if have_args else ('\r\033[1A\033[0K', )
print(*args, **kwargs)
if reprint_progress and self._active:
self.print(progress=True)
def _gen_progress(self):
elapsed = (datetime.now() - self._start_time).total_seconds()
run_str = f'{self._active} running'
q_str = f'{len(self._queue)} queued'
fail_str = f'({pluralize(self._failed, "failure")})' if self._failed else ''
finish_str = f'{self._finished} finished'
return (f'> {elapsed:.0f}s | {run_str} | {q_str} | {finish_str}{fail_str}',)
# main
if __name__ == "__main__":
app = App()
if not os.path.isdir(app.output):
app.print(f'Error: output is not a directory: {app.output}')
sys.exit(1)
if os.path.isdir(app.inputs[0]):
if len(app.inputs) > 1:
app.print('Warning: ignoring additional inputs in directory input mode')
input_files = glob(f'{app.inputs[0]}/*.aaxc')
if input_files:
app.inputs = input_files
else:
app.print(f'Error: input directory contains no aaxc files: {app.inputs[0]}')
sys.exit(1)
else:
for file in app.inputs:
if not os.path.isfile(file):
app.print(f'Error: input file not found: {file}')
sys.exit(1)
signal(SIGINT, lambda *_: app.cancel())
sys.exit(app.main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment