-
-
Save itzexor/46a6224fba7a430344799842c3bc79b6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from glob import glob | |
| from shlex import quote | |
| from shutil import copyfile | |
| from subprocess import Popen, PIPE, DEVNULL, CalledProcessError | |
| from concurrent.futures import ThreadPoolExecutor, Future | |
| from argparse import ArgumentParser | |
| from signal import signal, SIGINT | |
| from enum import Enum, auto | |
| import threading | |
| from urllib.request import urlopen | |
| from html.parser import HTMLParser | |
| import html | |
| import json | |
| import os | |
| import sys | |
| import argparse | |
| import re | |
| import traceback | |
| import time | |
| DEFAULT_THREAD_LIMIT = 4 | |
| '''Default max simultaneous jobs''' | |
| DELIM_NAME = ', ' | |
| '''Name-type metadata field delimiter''' | |
| DELIM_GENRE = '; ' | |
| '''Genre-type metadata field delimiter''' | |
| #some specific jank to remove | |
| NAMES_REMOVE = ( | |
| 'The Great Courses' | |
| ) | |
| '''Exact name matches to remove entirely''' | |
| NAMES_REPLACE = ( | |
| (' - introductions', ''), | |
| ('James S.A. Corey', 'James S. A. Corey'), | |
| ('eden Hudson', 'Eden Hudson') | |
| ) | |
| '''(string, replace) pairs to replace within names''' | |
| FFMPEG_CMD = ('ffmpeg', '-loglevel', 'error') | |
| DEFAULT_POPEN_KWARGS = { | |
| 'bufsize': 0, | |
| 'pipesize': 1048576, | |
| 'stderr': DEVNULL, | |
| 'stdin': DEVNULL, | |
| 'stdout': DEVNULL | |
| } | |
| POPEN_PIPE_CHUNK_SIZE = 65535 | |
| '''In-app read chunk size from decoder pipe''' | |
| POLLING_INTERVAL = 1/10 | |
| '''General polling interval in seconds''' | |
| PROGRESS_INTERVAL = 1 | |
| '''Progress printing interval in seconds''' | |
| REGEX_MATCH_HTML = r'<\/?[\w\s]*>|<.+[\W]>' | |
| FILENAME_CHAR_MAP = ( | |
| ('<', '﹤'), | |
| ('>', '﹥'), | |
| (':', 'ː'), | |
| ('"', '“'), | |
| ('/', '⁄'), | |
| ('|', '⼁'), | |
| ('?', '﹖'), | |
| ('*', '﹡'), | |
| ('\\', '∖') | |
| ) | |
| PRINT_TIME_FMT = '%H:%m:%S' | |
| '''App().print() timestamping format''' | |
| FFMPEG_CHAPTER_FMT = \ | |
| '''[CHAPTER] | |
| TIMEBASE=1/1000 | |
| START={start} | |
| END={end} | |
| TITLE={title}''' | |
| '''FFMPEG FFMETADATA chapter template''' | |
| MATROSKA_TAGS_XML_FMT = \ | |
| '''<?xml version="1.0" encoding="UTF-8"?> | |
| <!DOCTYPE Tags SYSTEM "matroskatags.dtd"> | |
| <Tags> | |
| {tags} | |
| </Tags> | |
| ''' | |
| '''Matroska tags file template''' | |
| MATROSKA_TAG_SIMPLE_FMT = \ | |
| ''' <Tag> | |
| <Simple> | |
| <Name>{key}</Name> | |
| <String>{value}</String> | |
| </Simple> | |
| </Tag>''' | |
| '''Matroska simple tag template''' | |
| MATROSKA_CHAPTERS_XML_FMT = \ | |
| '''<?xml version="1.0" encoding="UTF-8"?> | |
| <!DOCTYPE Chapters SYSTEM "matroskachapters.dtd"> | |
| <Chapters> | |
| <EditionEntry> | |
| {atoms} | |
| </EditionEntry> | |
| </Chapters> | |
| ''' | |
| '''Matroska chapters file template''' | |
| MATROSKA_CHAPTER_ATOM_FMT = \ | |
| ''' <ChapterAtom> | |
| <ChapterUID>{uid}</ChapterUID> | |
| <ChapterTimeStart>{start}</ChapterTimeStart> | |
| <ChapterTimeEnd>{end}</ChapterTimeEnd> | |
| <ChapterDisplay> | |
| <ChapterString>{title}</ChapterString> | |
| </ChapterDisplay> | |
| </ChapterAtom>''' | |
| '''Matroska chapter atom template''' | |
| class OperationCancelled(Exception): | |
| '''An exception raised when an operation was interrupted by user cancellation''' | |
| pass | |
| class MetadataNotLoaded(Exception): | |
| pass | |
| class StrEnum(Enum): | |
| def __str__(self): | |
| return self.value | |
| @staticmethod | |
| def _generate_next_value_(name, start, count, last_values): | |
| #ENUM_MEMBER.value = "enum-member" | |
| return name.lower().replace('_', '-') | |
| # https://stackoverflow.com/a/60750535 | |
| class EnumAction(argparse.Action): | |
| """ | |
| Argparse action for handling Enums | |
| """ | |
| def __init__(self, **kwargs): | |
| # Pop off the type value | |
| enum_type = kwargs.pop('type', None) | |
| # Ensure an Enum subclass is provided | |
| if enum_type is None: | |
| raise ValueError('type must be assigned an Enum when using EnumAction') | |
| if not issubclass(enum_type, Enum): | |
| raise TypeError('type must be an Enum when using EnumAction') | |
| # Generate choices from the Enum | |
| kwargs.setdefault('choices', tuple(e.value for e in enum_type)) | |
| super(EnumAction, self).__init__(**kwargs) | |
| self._enum = enum_type | |
| def __call__(self, parser, namespace, values, option_string=None): | |
| # Convert value back into an Enum | |
| value = self._enum(values) | |
| setattr(namespace, self.dest, value) | |
| class Container(StrEnum): | |
| '''Output container format''' | |
| MP4 = auto() | |
| '''opus in mp4 container, "m4b" file extension''' | |
| OGG = auto() | |
| '''opus in ogg container, "opus" file extension''' | |
| WEBM = auto() | |
| '''opus in webm container, "webm" file extension''' | |
| class Quality(StrEnum): | |
| '''Output opus quality setting''' | |
| MONO_VOICE = auto() | |
| '''mono 32k voice mode''' | |
| STEREO_VOICE = auto() | |
| '''stereo 48k voice mode''' | |
| STEREO = auto() | |
| '''stereo 64k auto''' | |
| @dataclass | |
| class Chapter: | |
| '''Represents a single chapter within an AaxcFile()''' | |
| index: int | |
| '''Chapter index''' | |
| title: str | |
| '''Chapter title''' | |
| duration: int | |
| '''Chapter duration''' | |
| input_offset: int | |
| '''Start offset relative to AaxcFile() input file''' | |
| output_offset: int | |
| '''Start offset relative to AaxcFile() output file''' | |
| class AudnexusSummaryFormatter(HTMLParser): | |
| ''' | |
| Converts an audnexus/audible summary from html to plain text with stylized unicode | |
| character based formatting. | |
| ''' | |
| UNI_FORMAT_MAP = { | |
| 'keys': 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', | |
| 'value_sets': { | |
| 'b': '𝐚𝐛𝐜𝐝𝐞𝐟𝐠𝐡𝐢𝐣𝐤𝐥𝐦𝐧𝐨𝐩𝐪𝐫𝐬𝐭𝐮𝐯𝐰𝐱𝐲𝐳𝐀𝐁𝐂𝐃𝐄𝐅𝐆𝐇𝐈𝐉𝐊𝐋𝐌𝐍𝐎𝐏𝐐𝐑𝐒𝐓𝐔𝐕𝐖𝐗𝐘𝐙𝟎𝟏𝟐𝟑𝟒𝟓𝟔𝟕𝟖𝟗', | |
| 'i': '𝑎𝑏𝑐𝑑𝑒𝑓𝑔ℎ𝑖𝑗𝑘𝑙𝑚𝑛𝑜𝑝𝑞𝑟𝑠𝑡𝑢𝑣𝑤𝑥𝑦𝑧𝐴𝐵𝐶𝐷𝐸𝐹𝐺𝐻𝐼𝐽𝐾𝐿𝑀𝑁𝑂𝑃𝑄𝑅𝑆𝑇𝑈𝑉𝑊𝑋𝑌𝑍0123456789', | |
| 'bi': '𝒂𝒃𝒄𝒅𝒆𝒇𝒈𝒉𝒊𝒋𝒌𝒍𝒎𝒏𝒐𝒑𝒒𝒓𝒔𝒕𝒖𝒗𝒘𝒙𝒚𝒛𝑨𝑩𝑪𝑫𝑬𝑭𝑮𝑯𝑰𝑱𝑲𝑳𝑴𝑵𝑶𝑷𝑸𝑹𝑺𝑻𝑼𝑽𝑾𝑿𝒀𝒁𝟎𝟏𝟐𝟑𝟒𝟓𝟔𝟕𝟖𝟗' | |
| } | |
| } | |
| '''HTML-to-Unicode map''' | |
| def __init__(self, *, convert_charrefs: bool = True) -> None: | |
| self.states = {} | |
| self.buffer = '' | |
| self.needs_newlines = False | |
| super().__init__(convert_charrefs=convert_charrefs) | |
| def get_state(self, key): | |
| return not not self.states[key] if key in self.states else False | |
| def set_state(self, key, state): | |
| if key in self.states: | |
| if state: | |
| self.states[key] += 1 | |
| elif self.states[key]: | |
| self.states[key] -= 1 | |
| else: | |
| self.states[key] = 1 if state else 0 | |
| def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: | |
| self.set_state(tag, True) | |
| def handle_endtag(self, tag: str) -> None: | |
| self.set_state(tag, False) | |
| if tag == 'p' and not self.get_state('p'): | |
| self.needs_newlines = True | |
| def handle_data(self, data: str) -> None: | |
| if self.needs_newlines and self.get_state('p'): | |
| self.buffer += '\n\n' | |
| self.needs_newlines = False | |
| style = 'b' if self.get_state('b') else '' | |
| if self.get_state('i'): | |
| style = f'{style}i' | |
| if style: | |
| for char in data: | |
| if (i := AudnexusSummaryFormatter.UNI_FORMAT_MAP['keys'].find(char)) != -1: | |
| self.buffer += AudnexusSummaryFormatter.UNI_FORMAT_MAP['value_sets'][style][i] | |
| else: | |
| self.buffer += char | |
| else: | |
| self.buffer += data | |
| def process(self, input=''): | |
| self.buffer = '' | |
| self.states = {} | |
| self.feed(input) | |
| return self.buffer | |
| class AaxcFile: | |
| def __init__(self, aaxc_path: str, combine_chapter_names = False, use_metadata_cache = True) -> None: | |
| self._use_cache = use_metadata_cache | |
| self._combine_chapter_names = combine_chapter_names | |
| self._metadata_loaded = False | |
| self._location, self._filename = os.path.split(aaxc_path) | |
| self._filename, _ = os.path.splitext(self._filename) | |
| self.input_file = aaxc_path | |
| '''input aaxc file''' | |
| self.input_sample_rate: int | None | |
| '''aaxc sample rate''' | |
| self.input_bit_rate: int | |
| '''aaxc bit rate''' | |
| self.input_duration: int | |
| '''input file duration''' | |
| self.input_start_offset: int | |
| '''input file audible intro length''' | |
| self.input_end_offset: int | |
| '''input file audible outro length''' | |
| self.key: str | |
| '''aaxc encryption key''' | |
| self.iv: str | |
| '''aaxc encryption initialization vector''' | |
| self.asin: str | |
| '''audible asin''' | |
| self.input_base_filename: str | |
| '''common filename prefix between aaxc, voucher, chapters, and cover files''' | |
| self.cover_file: str | |
| '''cover jpg file associated with this aaxc, if any''' | |
| self.metadata: dict[str, str] | |
| '''dict containing metadata tags for the output file(s), only contains 'asin' until metadata import''' | |
| self.output_duration = None | |
| '''output file duration''' | |
| self.output_filename = None | |
| '''output filename without an extension, only valid after metadata import''' | |
| self.output_directory = None | |
| '''full destination output directory, only valid after metadata import''' | |
| self.chapters: tuple[Chapter, ...] | |
| '''a tuple containing Chapter() entries''' | |
| def _load_initial(self): | |
| with open(f'{self._location}/{self._filename}.voucher', 'r') as vf: | |
| voucher = json.load(vf)['content_license'] | |
| self.key = clean_text(voucher['license_response']['key']) | |
| self.iv = clean_text(voucher['license_response']['iv']) | |
| content_fmt = clean_text(voucher['content_metadata']['content_reference']['content_format']) | |
| filename_suffix = f"-{content_fmt}" | |
| _, sr, br = filename_suffix.split('_') | |
| self.input_base_filename = f'{self._location}/{self._filename.replace(filename_suffix, "")}' | |
| if not (pic := glob(f'{self.input_base_filename}_(*).jpg')[-1]): | |
| pic = None | |
| self.input_base_filename = f'{self._location}/{self._filename.replace(filename_suffix, "")}' | |
| self.input_sample_rate = 22050 if sr == '22' else 44100 if sr == '44' else None | |
| self.input_bit_rate = int(br) | |
| self.asin = clean_text(voucher['asin']) | |
| self.metadata = { 'asin': self.asin } | |
| self.cover_file = pic | |
| def _load_chapters(self): | |
| with open(f'{self.input_base_filename}-chapters.json', 'r') as cf: | |
| chapters_json = json.load(cf)['content_metadata']['chapter_info'] | |
| self.input_start_offset = int(chapters_json['brandIntroDurationMs']) | |
| self.input_end_offset = int(chapters_json['brandOutroDurationMs']) | |
| self.input_duration = int(chapters_json['runtime_length_ms']) | |
| self.output_duration = self.input_duration - self.input_start_offset - self.input_end_offset | |
| # for chapters mapping source to output: | |
| # -first chapter must be offset by start offset | |
| # for chapters referencing output file: | |
| # -first chapter must start at 0 | |
| # -all other chapters must be offset by start offset | |
| # both: | |
| # -first chapter duration must be shortened by start offset | |
| # -last chapter duration must be shortened by end trim | |
| def flatten(node: dict, prefix: str = '', chapter_list: list[Chapter] = []): | |
| #FIXME: matroska supports this natively | |
| #Handles recursively traversing the chapter tree when each book has it's own chapter heading. Produces | |
| # output like "Book 2: Chapter 3" instead of having multiple "Chapter 3" in a single file if use_combined_chapter_names | |
| # is True. Multi-book files don't always have nested or even per-book chapters. | |
| for item in node: | |
| index = len(chapter_list) | |
| title = f"{prefix}{clean_text(item['title'])}" | |
| offset = int(item['start_offset_ms']) | |
| duration = int(item['length_ms']) | |
| if index == 0: | |
| duration -= self.input_start_offset | |
| chapter_list.append(Chapter(index, title, duration, self.input_start_offset, 0)) | |
| else: | |
| chapter_list.append(Chapter(index, title, duration, offset, offset - self.input_start_offset)) | |
| if 'chapters' in item: | |
| flatten(item['chapters'], f'{title}: ' if self._combine_chapter_names else '', chapter_list) | |
| return chapter_list | |
| chapter_list = flatten(chapters_json['chapters']) | |
| chapter_list[-1].duration -= self.input_end_offset | |
| return tuple(chapter_list) | |
| def _load_metadata(self): | |
| m_filename = f'{self.input_base_filename}-metadata.json' | |
| try: | |
| with open(m_filename, 'r') as meta_file: | |
| meta = json.load(meta_file) | |
| except FileNotFoundError: | |
| with urlopen(f'https://api.audnex.us/books/{self.asin}') as book_json: | |
| json_s = book_json.read().decode('UTF-8') | |
| meta = json.loads(json_s) | |
| with open(m_filename, 'w') as m_file: | |
| m_file.write(json_s) | |
| # avoid hammering audnexus too hard | |
| # roughly limits to n threads per second requests | |
| # in worst case | |
| time.sleep(1/3) | |
| # sort mononyms last to work around "lastname, firstname" detection in abs | |
| authors, mononym_authors = [], [] | |
| for author in meta['authors']: | |
| name = clean_text(author['name']) | |
| if name in NAMES_REMOVE: | |
| continue | |
| for args in NAMES_REPLACE: | |
| name = name.replace(*args) | |
| if '\u0020' in name: | |
| authors.append(name) | |
| else: | |
| mononym_authors.append(name) | |
| narrators = [clean_text(n['name']) for n in meta['narrators']] | |
| narrators.sort(key=lambda n: '\u0020' not in n) | |
| # 'tags' is not imported by abs, but we don't want to combine it with genres | |
| genre, tags = [], [] | |
| for g in meta['genres']: | |
| name = clean_text(g['name']) | |
| if g['type'] == 'genre': | |
| genre.append(name) | |
| else: | |
| tags.append(name) | |
| authors = DELIM_NAME.join(authors + mononym_authors) | |
| narrators = DELIM_NAME.join(narrators) | |
| genre = DELIM_GENRE.join(genre) | |
| tags = DELIM_GENRE.join(tags) | |
| # restore some of the original formatting - not perfect by any means | |
| desc = format_audnexus_summary(meta['summary']) | |
| desc = clean_text(desc, remove_newlines=False) | |
| self.metadata.update({ | |
| 'language': clean_text(meta['language']), | |
| 'artist': authors, | |
| 'composer': narrators, | |
| 'genre': genre, | |
| 'tags': tags, | |
| 'date': clean_text(meta['releaseDate'][:10]), | |
| 'title': clean_text(meta['title']), | |
| 'description': desc, | |
| 'publisher': clean_text(meta['publisherName']) | |
| }) | |
| has_series = False | |
| if 'seriesPrimary' in meta and 'position' in meta['seriesPrimary']: | |
| if 'asin' in meta['seriesPrimary']: | |
| self.metadata['series-asin'] = clean_text(meta['seriesPrimary']['asin']) | |
| self.metadata['series'] = clean_text(meta['seriesPrimary']['name'], sub_comma=True) | |
| self.metadata['series-part'] = clean_text(meta['seriesPrimary']['position']) | |
| has_series = True | |
| if 'subtitle' in meta: | |
| self.metadata['subtitle'] = clean_text(meta['subtitle']) | |
| #output patterns: | |
| #normal: author/title/title.extension | |
| # ex: Bob Bobbyson/Bobbin' It Up/Bobbin' It Up.opus | |
| #series: author/series/series-part. title/title.extension | |
| # ex: Bob Bobbyson/Bob's Adventures/2. Bobbin' It Up/Bobbin' It Up.opus | |
| filename_prefix = clean_filename(self.metadata['title']) | |
| if has_series: | |
| series_dir = f"{clean_filename(self.metadata['series'])}/{clean_filename(self.metadata['series-part'])}. " | |
| else: | |
| series_dir = '' | |
| self.output_directory = f'{clean_filename(authors)}/{series_dir}{filename_prefix}' | |
| self.output_filename = filename_prefix | |
| def load(self): | |
| if self._metadata_loaded: | |
| return | |
| self._load_initial() | |
| self.chapters = self._load_chapters() | |
| self._load_metadata() | |
| self._metadata_loaded = True | |
| class AaxcTools: | |
| @staticmethod | |
| def generate_ffmetadata(aaxc: AaxcFile): | |
| ret = [';FFMETADATA1'] | |
| for key, value in aaxc.metadata.items(): | |
| ret.append(f'{key}={ffmetadata_escape(value)}') | |
| for c in aaxc.chapters: | |
| ret.append(FFMPEG_CHAPTER_FMT.format( | |
| start=c.output_offset, | |
| end=c.output_offset + c.duration, | |
| title=ffmetadata_escape(c.title)) | |
| ) | |
| return '\n'.join(ret) | |
| @staticmethod | |
| def generate_matroska_metadata(aaxc: AaxcFile) -> tuple[str, str]: | |
| tags = [] | |
| chapters = [] | |
| for key, value in aaxc.metadata.items(): | |
| if key == 'date': | |
| key = 'release_date' | |
| tags.append(MATROSKA_TAG_SIMPLE_FMT.format(key=key, value=html.escape(value))) | |
| for c in aaxc.chapters: | |
| kw = { | |
| 'uid': c.index + 1, | |
| 'start': ms_to_fftime(c.output_offset), | |
| 'end': ms_to_fftime(c.output_offset + c.duration), | |
| 'title': html.escape(c.title) | |
| } | |
| chapters.append(MATROSKA_CHAPTER_ATOM_FMT.format(**kw)) | |
| tags = MATROSKA_TAGS_XML_FMT.format(tags='\n'.join(tags)) | |
| chapters = MATROSKA_CHAPTERS_XML_FMT.format(atoms='\n'.join(chapters)) | |
| return (tags, chapters) | |
| @staticmethod | |
| def generate_opusenc_metadata_args(aaxc: AaxcFile) -> list[str]: | |
| ret = [] | |
| for key, value in aaxc.metadata.items(): | |
| if key in ('title', 'artist', 'genre', 'date'): | |
| ret += (f'--{key}', f'{value}') | |
| else: | |
| ret += ('--comment', f'{key}={value}') | |
| if aaxc.cover_file: | |
| ret += ('--picture', aaxc.cover_file) | |
| for c in aaxc.chapters: | |
| ret += ('--comment', f'CHAPTER{c.index:03d}={ms_to_fftime(c.output_offset)}', | |
| '--comment', f'CHAPTER{c.index:03d}NAME={c.title}') | |
| return ret | |
| @staticmethod | |
| def transcode(aaxc: AaxcFile, output_dir: str, quality: Quality, container: Container, cancel_event: threading.Event, print_fn = None): | |
| if print_fn is None: | |
| def print_fn(*args): | |
| pass | |
| def chk_cancel(): | |
| if cancel_event.is_set(): | |
| raise OperationCancelled() | |
| aaxc.load() | |
| chk_cancel() | |
| print_fn(f'Transcode started for asin {aaxc.asin}({aaxc.metadata["title"]})') | |
| dir_mode = os.stat(output_dir).st_mode | |
| output_dir = f'{output_dir}/{aaxc.output_directory}/' | |
| os.makedirs(output_dir, mode=dir_mode, exist_ok=True) | |
| output_file = f'{output_dir}{aaxc.output_filename}' | |
| cover_file = f'{output_dir}cover.jpg' #fixme: jpg? | |
| ogg_filename = f'{output_file}.opus' | |
| decoder_args = [] | |
| encoder_args = [] | |
| temp_files = [(ogg_filename, None)] | |
| remux_cmds = [] | |
| embedded_cover = True | |
| match quality: | |
| case Quality.MONO_VOICE: | |
| br = '32' | |
| encoder_args.append('--speech') | |
| decoder_args = ('-ac', '1') | |
| case Quality.STEREO_VOICE: | |
| br = '48' | |
| encoder_args.append('--speech') | |
| case Quality.STEREO: | |
| br = '64' | |
| encoder_args += ('--bitrate', f'{br}k') | |
| match container: | |
| case Container.OGG: | |
| output_file = ogg_filename | |
| temp_files = [] | |
| encoder_args += AaxcTools.generate_opusenc_metadata_args(aaxc) | |
| case Container.MP4: | |
| output_file += '.mp4' | |
| ffmetadata_filename = f'{output_dir}ffmetadata' | |
| temp_files.append((ffmetadata_filename, AaxcTools.generate_ffmetadata(aaxc))) | |
| remux_cmds.append((*FFMPEG_CMD, '-i', ogg_filename, | |
| '-i', ffmetadata_filename, | |
| '-map_metadata', '1', | |
| '-movflags', 'use_metadata_tags', | |
| '-movflags', 'faststart', | |
| '-codec', 'copy', | |
| '-f', 'mp4', | |
| output_file)) | |
| case Container.WEBM: | |
| output_file += ".webm" | |
| embedded_cover = False | |
| temp_webm = f'{output_file}.tmp' | |
| tags_file = f'{output_dir}tags' | |
| chapter_file = f'{output_dir}chapters' | |
| tags, chapters = AaxcTools.generate_matroska_metadata(aaxc) | |
| temp_files.append((tags_file, tags)) | |
| temp_files.append((chapter_file, chapters)) | |
| temp_files.append((temp_webm, None)) | |
| remux_cmds.append(('mkvmerge', '--output', temp_webm, | |
| '--webm', | |
| '--quiet', | |
| '--global-tags', tags_file, | |
| '--chapters', chapter_file, | |
| ogg_filename)) | |
| remux_cmds.append(('mkclean', '--quiet', | |
| '--remux', | |
| '--optimize', | |
| temp_webm, | |
| output_file)) | |
| decoder_kwargs = { | |
| 'args': (*FFMPEG_CMD, '-audible_key', aaxc.key, | |
| '-audible_iv', aaxc.iv, | |
| '-ss', ms_to_fftime(aaxc.input_start_offset), | |
| '-t', ms_to_fftime(aaxc.output_duration), | |
| '-i', aaxc.input_file, | |
| '-map_metadata', '-1', | |
| *decoder_args, | |
| '-f', 'wav', | |
| '-') | |
| } | |
| encoder_kwargs = { | |
| 'args': ('opusenc', '--quiet', | |
| *encoder_args, | |
| '-', | |
| ogg_filename) | |
| } | |
| piped_popen(decoder_kwargs, encoder_kwargs, cancel_event) | |
| chk_cancel() | |
| if aaxc.cover_file and not embedded_cover: | |
| copyfile(aaxc.cover_file, cover_file) | |
| for file, content in temp_files: | |
| if content is None: | |
| continue | |
| chk_cancel() | |
| with open(file, 'w') as f: | |
| f.write(content) | |
| chk_cancel() | |
| for cmd in remux_cmds: | |
| piped_popen({'args': cmd}, cancel_event=cancel_event) | |
| for file, _ in temp_files: | |
| os.remove(file) | |
| print_fn(f'Transcode complete for asin {aaxc.asin}') | |
| # utilities | |
| def format_audnexus_summary(s: str) -> str: | |
| return AudnexusSummaryFormatter().process(s) | |
| def clean_filename(input=''): | |
| out = input | |
| for args in FILENAME_CHAR_MAP: | |
| out = out.replace(*args) | |
| return out | |
| def clean_text(input='', sub_comma=False, remove_newlines=True): | |
| #remove html | |
| out = re.sub(REGEX_MATCH_HTML, '', input) | |
| #remove weird whitespace | |
| pattern = r'\u202F|\u00A0| ' | |
| if remove_newlines: | |
| pattern += r'|\n' | |
| out = re.sub(pattern, '\u0020', out).strip() | |
| if sub_comma: | |
| # replace comma with SINGLE LOW-9 QUOTATION MARK to prevent commas in series from being used as an item delimiter | |
| out = out.replace('\u002c', '\u201A') | |
| return out | |
| def ffmetadata_escape(input=''): | |
| '''Escape text to conform to ffmetadata escaping rules''' | |
| for s in (r'\=;#'): | |
| input = input.replace(s, f'\\{s}') | |
| return input.replace('\n', r'\\n') | |
| def ms_to_fftime(milliseconds=0): | |
| '''Return hh:mm:ss.fff format from milliseconds''' | |
| (s, ms) = divmod(milliseconds, 1000) | |
| (m, s) = divmod(s, 60) | |
| (h, m) = divmod(m, 60) | |
| return f'{h:02}:{m:02}:{s:02}.{ms:03d}' | |
| def pluralize(n, singular, plural_suffix='s') -> str: | |
| return f'{n} {singular}{"" if n == 1 else plural_suffix}' | |
| def piped_popen(kwargs_a, kwargs_b=None, cancel_event=None) -> None: | |
| ''' | |
| Executes Popen with kwargs_a and polls until cancel_chk function returns True or | |
| the process exits. | |
| If kwargs_b is provided, stdout from a is piped to stdin of b. | |
| Returns: | |
| * None | |
| Raises: | |
| * CalledProcessError on non-zero exit status | |
| * OperationCancelled upon cancel_chk() == True | |
| ''' | |
| if kwargs_b: | |
| kwargs_a = DEFAULT_POPEN_KWARGS | kwargs_a | |
| kwargs_b = DEFAULT_POPEN_KWARGS | kwargs_b | |
| kwargs_a['stdout'] = PIPE | |
| kwargs_b['stdin'] = PIPE | |
| with Popen(**kwargs_b) as proc_b: | |
| with Popen(**kwargs_a) as proc_a: | |
| while proc_a.poll() == None: | |
| if cancel_event and cancel_event.is_set(): | |
| raise OperationCancelled() | |
| proc_b.stdin.write(proc_a.stdout.read(POPEN_PIPE_CHUNK_SIZE)) | |
| if proc_a.returncode != 0: | |
| raise CalledProcessError(proc_a.returncode, kwargs_a['args']) | |
| proc_b.stdin.close() | |
| while proc_b.poll() == None: | |
| if cancel_event and cancel_event.is_set(): | |
| raise OperationCancelled() | |
| time.sleep(POLLING_INTERVAL) | |
| if proc_b.returncode != 0: | |
| raise CalledProcessError(proc_b.returncode, kwargs_b['args']) | |
| else: | |
| with Popen(**(DEFAULT_POPEN_KWARGS | kwargs_a)) as process: | |
| while process.poll() == None: | |
| if cancel_event and cancel_event.is_set(): | |
| raise OperationCancelled() | |
| time.sleep(POLLING_INTERVAL) | |
| if process.returncode: | |
| raise CalledProcessError(process.returncode, process.args) | |
| class App(): | |
| def __init__(self) -> None: | |
| self._print_lock = threading.RLock() | |
| self._cancel_event = threading.Event() | |
| self._printed_progress = False | |
| self._executor = ThreadPoolExecutor() | |
| self._job_stats_lock = threading.Lock() | |
| self._active = 0 | |
| self._finished = 0 | |
| self._failed = 0 | |
| self._queue = [] | |
| # linter helpers | |
| self.threads: int | |
| self.container: Container | |
| self.quality: Quality | |
| self.quiet: bool | |
| self.combine_titles: bool | |
| self.inputs: list[str] | |
| self.output: str | |
| # script args setup + parse | |
| parser = ArgumentParser(prog='abtc', | |
| description='transcode aaxcs from audible-cli with audnexus metadata') | |
| parser.add_argument('-t', '--threads', | |
| type=int, | |
| default=DEFAULT_THREAD_LIMIT, | |
| help='max number of processing threads') | |
| parser.add_argument('-c', '--container', | |
| type=Container, | |
| default=Container.WEBM, | |
| action=EnumAction, | |
| help='output file container type') | |
| parser.add_argument('-q', '--quality', | |
| type=Quality, | |
| default=Quality.MONO_VOICE, | |
| action=EnumAction, | |
| help='output file opus quality') | |
| parser.add_argument('-s', '--quiet', | |
| action='store_true', | |
| help='silence output') | |
| parser.add_argument('-C' '--combine-titles', | |
| dest='combine_titles', | |
| action='store_true', | |
| help="combine parent chapter title with child's when nested") | |
| parser.add_argument('output', | |
| help='output directory') | |
| parser.add_argument('inputs', | |
| nargs='+', | |
| help='input file(s) or a single directory') | |
| parser.parse_args(namespace=self) | |
| @property | |
| def cancelled(self): | |
| return self._cancel_event.is_set() | |
| def _job_done_cb(self, future: Future): | |
| with self._job_stats_lock: | |
| self._active -= 1 | |
| self._finished += 1 | |
| if self.cancelled: | |
| return | |
| exc = future.exception() | |
| msg = None | |
| if exc: | |
| #can't avoid broken pipe error upon interruption during stdio | |
| if isinstance(exc, (OperationCancelled, BrokenPipeError)): | |
| pass | |
| else: | |
| self._failed += 1 | |
| if isinstance(exc, CalledProcessError): | |
| cmd = ' '.join(quote(arg) for arg in exc.cmd) | |
| msg = f'Exec failed with code {exc.returncode}: "{cmd}"' | |
| else: | |
| msg = f'Something broke...\n{"".join(traceback.format_exception(exc))}' | |
| elif msg := future.result(): | |
| pass | |
| if msg: | |
| self.print(msg) | |
| def cancel(self): | |
| if self.cancelled: | |
| return | |
| self._cancel_event.set() | |
| self._executor.shutdown(wait=False, cancel_futures=True) | |
| def main(self) -> int: | |
| self._start_time = datetime.now() | |
| self.print(f'Started with {pluralize(len(self.inputs), "input")}') | |
| progress_loops = PROGRESS_INTERVAL / POLLING_INTERVAL | |
| i = progress_loops | |
| self._queue = [AaxcFile(i, self.combine_titles) for i in self.inputs] | |
| return_code = 0 | |
| while True: | |
| i += 1 | |
| if i >= progress_loops: | |
| self.print(progress=True) | |
| i = 0 | |
| while self._active < self.threads and len(self._queue): | |
| self._active += 1 | |
| tc_args = (self.output, self.quality, self.container, self._cancel_event, self.print) | |
| future = self._executor.submit(AaxcTools.transcode, self._queue.pop(), *tc_args) | |
| future.add_done_callback(self._job_done_cb) | |
| if not self._active and not len(self._queue): | |
| break | |
| if self._cancel_event.wait(POLLING_INTERVAL): | |
| return_code = 1 | |
| break | |
| if self._failed: | |
| return_code = 1 | |
| end_time = datetime.now() | |
| duration = (end_time - self._start_time).total_seconds() | |
| status_str = f'with {pluralize(self._failed, "failure")}' if self._failed else '(cancelled)' if self.cancelled else 'successfully' | |
| self.print(f'Finished {status_str}, elapsed: {duration:.3f}s') | |
| return return_code | |
| def print(self, *args, progress=False, **kwargs): | |
| if self.quiet: | |
| return | |
| with self._print_lock: | |
| reprint_progress = False | |
| clear_progress = self._printed_progress | |
| if progress: | |
| args = self._gen_progress() | |
| self._printed_progress = True | |
| else: | |
| reprint_progress = self._printed_progress and not self.cancelled | |
| self._printed_progress = False | |
| have_args = len(args) > 0 | |
| if have_args: | |
| args = (f'[{datetime.now().strftime(PRINT_TIME_FMT)}]', *args) | |
| if clear_progress: | |
| args = (f'\r\033[1A\033[0K{args[0]}', *args[1:]) if have_args else ('\r\033[1A\033[0K', ) | |
| print(*args, **kwargs) | |
| if reprint_progress and self._active: | |
| self.print(progress=True) | |
| def _gen_progress(self): | |
| elapsed = (datetime.now() - self._start_time).total_seconds() | |
| run_str = f'{self._active} running' | |
| q_str = f'{len(self._queue)} queued' | |
| fail_str = f'({pluralize(self._failed, "failure")})' if self._failed else '' | |
| finish_str = f'{self._finished} finished' | |
| return (f'> {elapsed:.0f}s | {run_str} | {q_str} | {finish_str}{fail_str}',) | |
| # main | |
| if __name__ == "__main__": | |
| app = App() | |
| if not os.path.isdir(app.output): | |
| app.print(f'Error: output is not a directory: {app.output}') | |
| sys.exit(1) | |
| if os.path.isdir(app.inputs[0]): | |
| if len(app.inputs) > 1: | |
| app.print('Warning: ignoring additional inputs in directory input mode') | |
| input_files = glob(f'{app.inputs[0]}/*.aaxc') | |
| if input_files: | |
| app.inputs = input_files | |
| else: | |
| app.print(f'Error: input directory contains no aaxc files: {app.inputs[0]}') | |
| sys.exit(1) | |
| else: | |
| for file in app.inputs: | |
| if not os.path.isfile(file): | |
| app.print(f'Error: input file not found: {file}') | |
| sys.exit(1) | |
| signal(SIGINT, lambda *_: app.cancel()) | |
| sys.exit(app.main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment