Last active
March 13, 2022 12:26
-
-
Save hhsprings/1e0aa173fd58395d2c4e7cd303571b43 to your computer and use it in GitHub Desktop.
minimal playlist generator (for Windows Media Player, MPC-HC, etc)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! py -3 | |
# -*- coding: utf-8 -*- | |
# required: python3, mako, dateutil | |
r""" | |
easiest usage: | |
$ python3 mpplgen.py out.m3u8 --from_askopenfilenames --postproc_simple_editor | |
from pattern: | |
$ python3 mpplgen.py out.m3u8 --from_pattern='[1-3]*.wav' | |
from existing playlist: | |
$ python3 mpplgen.py out.m3u8 --from_list=original.wpl | |
If the format estimation when reading with "--from_list" is wrong, | |
you can force the reading format with "--listfile_reading_format": | |
$ python3 mpplgen.py out.m3u8 --from_list=x.xml --listf=RSS | |
$ python3 mpplgen.py out.m3u8 \ | |
--from_list=https://www.pbs.org/newshour/feeds/rss/podcasts/show \ | |
--listf=RSS | |
Also for writing format, if you want to write ".m3u" without "Extended M3U" | |
extension, it's easy to specify --writing_format: | |
$ python3 mpplgen.py out.m3u --from_p='*.mp3' --writing_format=TXT | |
If you want to ignore certain media after enumerating the media | |
to play, you can specify a "accept" filter: | |
$ python3 mpplgen.py out.mpl --from_p='*.mp3' --filter_module=yourmod.py | |
yourmod.py must implement 'accept', for example: | |
---------------------------- | |
def accept(media): | |
return "-NoSE" not in media["filename"] | |
---------------------------- | |
If you edit the media dictionary in the "accept" function, it will | |
be carried over to the output, so you can supplement the information | |
with, for example, mp3-tagger: | |
---------------------------- | |
# -*- coding: utf-8 -*- | |
import os | |
from mp3_tagger import MP3File | |
def accept(media): | |
t = MP3File(media["filename"]) | |
for k, v in t.get_tags()["ID3TagV2"].items(): | |
wk = { | |
"song": "title", | |
"track": "tracknumber", | |
}.get(k, k) | |
media[wk] = v[:v.index("\x00")] | |
media["file_size"] = os.stat(media["filename"]).st_size | |
return True | |
---------------------------- | |
most flexible usage: | |
$ python3 mpplgen.py out.m3u8 --from_generator_module=yourmod.py | |
yourmod.py must implement 'medias', for example: | |
---------------------------- | |
from glob import glob | |
def medias(*args): | |
return list(glob("*.flac")) | |
---------------------------- | |
---------------------------- | |
import io | |
import csv | |
def medias(*args): | |
# This example is for csv generated by "JRiver Media Center". | |
reader = csv.reader(io.open("pl.csv", encoding="utf-8")) | |
next(reader) | |
for row in reader: | |
yield dict( | |
filename=row[0], title="{} - {}".format(row[2], row[1])) | |
---------------------------- | |
---------------------------- | |
# This example is for ".plp". | |
# You can pass args via --args_for_generator_module, for example: | |
# $ python3 mpplgen.py pl.wpl --from_ge=yourmod.py --args_='"play.plp"' | |
import io | |
import re | |
def medias(*args): | |
rgx = re.compile(r"HARP, MUSIC(.*)") | |
with io.open(args[0], encoding="mbcs") as fi: | |
for line in fi.readlines(): | |
m = rgx.match(line.strip()) | |
if m: | |
yield dict(filename=m.group(1)) | |
---------------------------- | |
In some cases "--readcontent_converter_module" may be easier to use: | |
$ python3 mpplgen.py out.m3u8 --from_l=some.smp --readco=yourmod.py | |
yourmod.py must implement 'convert', for example: | |
---------------------------- | |
import re | |
def convert(cont): | |
result = [] | |
for line in re.split(r"[ \t]*\r?\n", cont): | |
m = re.match(r"File=(.*)", line.strip()) | |
if m: | |
result.append(m.group(1)) | |
return "\n".join(result) | |
---------------------------- | |
---------------------------- | |
def convert(cont): | |
result = [] | |
for line in re.split(r"[ \t]*\r?\n", cont): | |
s = line.strip() | |
if s and s[0] != "#": | |
result.append(s) | |
return "\n".join(result) | |
---------------------------- | |
mpc-hc can't handle rss feeds directly, but it can play media via the http | |
protocol, so you can: | |
$ PATH="c:/Program Files/MPC-HC/;${PATH}" mpplgen.py pl.m3u8 \ | |
--from_l="https://podcast.posttv.com/itunes/post-reports.xml" \ | |
--exec_p=mpc-hc64 | |
""" | |
from __future__ import unicode_literals | |
import io | |
import codecs | |
import os | |
import posixpath | |
import sys | |
import re | |
import logging | |
import importlib | |
import csv | |
import subprocess | |
import mimetypes | |
import json | |
import functools | |
import plistlib | |
import struct | |
import ctypes | |
from collections import defaultdict | |
from textwrap import dedent, wrap | |
from glob import glob | |
from datetime import datetime | |
from itertools import groupby | |
import xml.etree.ElementTree as ElementTree | |
__USER_AGENT__ = "\ | |
Mozilla/5.0 (Windows NT 10.0; Win64; x64) \ | |
AppleWebKit/537.36 (KHTML, like Gecko) \ | |
Chrome/91.0.4472.124 Safari/537.36" | |
import ssl | |
_htctxssl = ssl.create_default_context() | |
_htctxssl.check_hostname = False | |
_htctxssl.verify_mode = ssl.CERT_NONE | |
from urllib.request import urlretrieve as urllib_urlretrieve | |
from urllib.request import unquote as urllib_unquote | |
from urllib.request import quote as urllib_quote | |
from urllib.parse import urlsplit as urllib_urlsplit | |
from urllib.parse import urlunsplit as urllib_urlunsplit | |
from urllib.parse import urljoin as urllib_urljoin | |
try: | |
import tkinter | |
from tkinter.filedialog import askopenfilenames | |
from tkinter.simpledialog import Dialog | |
_HASNOTKINTER = False | |
except ImportError: | |
_HASNOTKINTER = True | |
from configparser import ConfigParser | |
import urllib.request | |
https_handler = urllib.request.HTTPSHandler(context=_htctxssl) | |
opener = urllib.request.build_opener(https_handler) | |
opener.addheaders = [('User-Agent', __USER_AGENT__)] | |
urllib.request.install_opener(opener) | |
from configparser import ConfigParser | |
import dateutil # pip install dateutil (no support for python 2.7) | |
import dateutil.tz | |
import dateutil.parser | |
from mako.template import Template # pip install mako | |
__MYNAME__, _ = os.path.splitext( | |
os.path.basename(sys.modules[__name__].__file__)) | |
__VERSION__ = "0.9.1.0" | |
_log = logging.getLogger(__MYNAME__) | |
NATIVE_ENCODING = "cp932" | |
ET_fromstring = ElementTree.XML | |
_et_empty_elem = ElementTree.Element("") | |
_et_empty_elem.text = "" | |
class ETWrapper(object): | |
def __init__(self, etobj): | |
self._et = etobj | |
self._attrib_lc = { | |
k.lower(): v | |
for k, v in etobj.attrib.items()} | |
self._it = None | |
def __getattr__(self, a): | |
return getattr(self._et, a) | |
def find(self, path, namespaces=None, fallback=_et_empty_elem): | |
res = self._et.find(path, namespaces) | |
if res is None: | |
return ETWrapper(fallback) | |
return ETWrapper(res) | |
def findall(self, path, namespaces=None, fallback=_et_empty_elem): | |
for el in self._et.findall(path, namespaces): | |
yield ETWrapper(el) | |
def __iter__(self): | |
self._it = iter(self._et) | |
return self | |
def __next__(self): | |
return ETWrapper(next(self._it)) | |
def __bool__(self): | |
t = self._et.tag | |
return len(self._et) > 0 or \ | |
not (t is None or t.strip() == "") | |
def __repr__(self): | |
outer = super().__repr__() | |
inner = repr(self._et) | |
return outer.replace("object ", "object ({}) ".format(inner)) | |
@property | |
def tag_lc(self): | |
return self._et.tag.lower() | |
@property | |
def attrib_lc(self): | |
return self._attrib_lc | |
@property | |
def text_stripped(self): | |
text = self._et.text | |
if text: | |
return text.strip() | |
@property | |
def text_rstripped(self): | |
text = self._et.text | |
if text: | |
return text.rstrip() | |
def to_int(s): | |
""" | |
>>> print(to_int("3.23")) | |
3 | |
>>> print(to_int(3.23)) | |
3 | |
""" | |
return int(float(s)) | |
def parse_time(s): | |
try: | |
return float(s) | |
except ValueError: | |
if "." in s: | |
n, _, ss = s.rpartition(".") | |
else: | |
n, ss = s, "0" | |
n = n.split(":") | |
if len(n) > 3: | |
raise ValueError("'{}' is not valid time.".format(s)) | |
result = sum([ | |
p * 60**(len(n) - 1 - i) | |
for i, p in enumerate(list(map(int, n)))]) | |
result += int(ss) / float((10**len(ss))) | |
return result | |
def ts_to_tss(ts, frac=3): | |
d, _, f = (("%%.%df" % frac) % ts).partition(".") | |
d = abs(int(d)) | |
ss_h = int(d / 3600) | |
d -= ss_h * 3600 | |
ss_m = int(d / 60) | |
d -= ss_m * 60 | |
ss_s = int(d) | |
return "%s%02d:%02d:%02d.%s" % ( | |
"" if ts >= 0 else "-", | |
ss_h, ss_m, ss_s, f) | |
def _wrap_longwarn(warntext, width=80): | |
return '\n' + "=" * width + '\n' + "\n".join( | |
wrap(dedent(warntext), width)) + '\n' + "=" * width | |
def open_textfile(fn, expected_contains=[], preffered_encoding=""): | |
chktxts = [ | |
# assuming text, it should contain newline. | |
"\n" | |
] | |
for ec in reversed(expected_contains): | |
chktxts.insert(0, ec) | |
# check utf families first | |
def _guess_utf_encoding(fn): | |
with io.open(fn, "rb") as fi: | |
bin = fi.read() | |
if bin[:4] == codecs.BOM_UTF32_LE: | |
return "utf-32" | |
if bin[:4] == codecs.BOM_UTF32_BE: | |
return "utf-32" | |
if bin[:3] == codecs.BOM_UTF8: | |
return "utf-8-sig" | |
if bin[:2] == codecs.BOM_UTF16_LE: | |
return "utf-16" | |
if bin[:2] == codecs.BOM_UTF16_BE: | |
return "utf-16" | |
for ct in chktxts: | |
for enc in ( | |
"utf-32-be", "utf-32-le", | |
"utf-16-be", "utf-16-le"): | |
try: | |
nlidx = bin.index(ct.encode(enc)) | |
if nlidx % 2 == 0: | |
return enc | |
except ValueError: | |
pass | |
detenc = _guess_utf_encoding(fn) | |
if detenc: | |
return io.open(fn, encoding=detenc) | |
# | |
candencs = ["utf-8", NATIVE_ENCODING, "iso-8859-1"] | |
if preffered_encoding and preffered_encoding not in candencs: | |
candencs.insert(0, preffered_encoding) | |
for i, enc in enumerate(candencs): | |
try: | |
fi = io.open(fn, encoding=enc) | |
cont = fi.read() | |
for ec in chktxts: | |
if ec in cont: | |
break | |
else: | |
continue | |
fi.seek(0) | |
return fi | |
except UnicodeError: | |
# if i == len(candencs) - 1: | |
# raise | |
pass | |
return io.open(fn) # ascii? | |
def path_noext(fn): | |
return os.path.basename( | |
os.path.splitext(fn)[0]) | |
def _topath_if_file(fn): | |
if not fn: | |
return fn | |
fn = fn.replace("\\", "/") | |
# The following formats are especially problematic: | |
# file:c:/Windows | |
# file:relpath.mp3 | |
# file://localhost/c:/Windows | |
# file:/dev/audio | |
scheme, netloc, path, query, fragment = urllib_urlsplit(fn) | |
if scheme in ("", "file"): | |
if netloc and not path: | |
# "file:relpath.mp3" | |
# i don't know this is valid, but it is foobar2000's form. | |
netloc, path = path, netloc | |
# in this case, ignore netloc (maybe "localhost") | |
if re.match(r"/[a-z]:", path, flags=re.I): | |
# Windows path | |
path = path[1:] | |
return urllib_urlunsplit(("", "", path, query, fragment)) | |
return fn | |
def path_split(fn): | |
fn = _topath_if_file(fn) | |
scheme, pathpart = "", fn | |
m = re.match(r"([a-z]+://)(.*)", fn) | |
if m: | |
scheme, pathpart = m.group(1, 2) | |
if scheme and re.match(r"/[a-z]:/", pathpart, flags=re.I): | |
# windows form, but "file:///" | |
pathpart = pathpart[1:] | |
return scheme, pathpart | |
def path_join2(par, chi): | |
if not par: | |
return chi | |
chi = _topath_if_file(chi) | |
cs, cp = path_split(chi) | |
if cs: | |
return chi | |
par = _topath_if_file(par) | |
ps, pp = path_split(par) | |
if ps and cp.startswith("/"): | |
return urllib_urljoin(par, chi) | |
path = os.path.normpath(os.path.join(pp, cp)).replace("\\", "/") | |
if ps: | |
return ps + path | |
return path | |
def urlunquote(fn, encoding="utf-8"): | |
try: | |
return urllib_unquote(fn, encoding=encoding) | |
except TypeError: | |
return urllib_unquote(fn).decode(encoding) | |
def path_urlquote(fn, encoding="utf-8"): | |
def _q(fn): | |
try: | |
return urllib_quote(fn, encoding=encoding) | |
except TypeError: | |
return urllib_quote(fn.encode(encoding)) | |
scheme, pathpart = path_split(fn) | |
drive, pathpart = os.path.splitdrive(pathpart) | |
pathpart = "/".join([_q(sp) for sp in pathpart.split("/")]) | |
return scheme + drive + pathpart | |
def _prep_normpath(fn, **options): | |
fn = _topath_if_file(fn) | |
if options.get("needs_urlunquote"): | |
return urlunquote( | |
fn, | |
options.get("urlunquote_encoding", "utf-8")) | |
return fn | |
def normpath(fn, playlist_outdir=".", **options): | |
pathmode = options.get("pathmode", "rel") | |
# require_pathscheme: never, except_file, must | |
require_pathscheme = options.get( | |
"require_pathscheme", "except_file",) | |
needs_urlquote = options.get("needs_urlquote", False,) | |
urlquote_encoding = options.get("urlquote_encoding", "utf-8",) | |
extra_urlquery = options.get("extra_urlquery") | |
if extra_urlquery: | |
scheme, netloc, url, query, fragment = urllib_urlsplit(fn) | |
newq = "&".join(query.split("&") + [ | |
"{}={}".format(*it) for it in extra_urlquery.items()]) | |
fn = urllib_urlunsplit((scheme, netloc, url, newq, fragment)) | |
scheme, pathpart = path_split(fn) | |
isfile = not scheme | |
if require_pathscheme == "never" and not isfile: | |
raise ValueError("scheme '{}' is not allowed".format(scheme)) | |
warn = False | |
if isfile: | |
if require_pathscheme == "must": | |
if pathmode == "abs" or ( | |
pathmode == "keep" and os.path.isabs(pathpart)): | |
scheme = "file://" | |
else: | |
scheme = "file:" | |
warn = True | |
else: | |
scheme = "" | |
if pathmode == "abs": | |
pathpart = os.path.abspath(pathpart) | |
elif pathmode == "rel": | |
cwd = os.path.abspath(".") | |
lod = os.path.abspath(playlist_outdir) | |
par = os.path.relpath(cwd, lod) | |
pathpart = os.path.join(par, pathpart) | |
ppd, ppb = os.path.split(os.path.abspath(pathpart)) | |
try: | |
rep = os.path.relpath(ppd, lod) | |
pathpart = os.path.join(rep, ppb) | |
except ValueError as e: | |
# for example, we can't join "c:/" and "e:/". | |
_log.warning( | |
"We cannot calculate a relpath to %r, because: %r", | |
pathpart, e) | |
fn = scheme + pathpart.replace("\\", "/") | |
if needs_urlquote: | |
fn = path_urlquote(fn, urlquote_encoding) | |
if warn: | |
_log.warning( | |
"""%r is a valid in terms of rfc3986, but almost all """ + | |
"""players cannot interpret it correctly. """ + | |
"""So, you will have to change it to absolute path or """ + | |
"""path representation without schema.""", fn) | |
return fn | |
_urlretrieved = dict() | |
def _urlretrieve(fn): | |
if fn in _urlretrieved: | |
return _urlretrieved[fn] | |
# | |
res = fn | |
m = re.match(r"([a-z]+://)(.*)", res) | |
if m: | |
if m.group(1) == "file://": | |
return res | |
try: | |
res, _ = urllib_urlretrieve(fn) | |
# import httplib2, tempfile | |
# http = httplib2.Http() | |
# resp, cont = http.request(fn) | |
# res = tempfile.mktemp() | |
# with io.open(res, "wb") as fo: | |
# fo.write(cont) | |
_log.info( | |
"retrieved contents from %r, and saved as %r", | |
fn, res.replace("\\", "/")) | |
except Exception as e: | |
raise ValueError( | |
"could not read '{}': {}".format( | |
fn, e)) | |
_urlretrieved[fn] = res | |
return res | |
def filegroup_tree(medias): | |
res = {} | |
for m in medias: | |
fn = m["filename"] | |
pp = list(filter(None, m.get("filegroup", "").split("/"))) | |
leaf = res | |
if pp: | |
for i in range(len(pp)): | |
p = "/".join(pp[:i + 1]) | |
if p not in leaf: | |
leaf[p] = {} | |
leaf = leaf[p] | |
if "" not in leaf: | |
leaf[""] = [] | |
leaf[""].append(m) | |
return res | |
class _NoNullDict(dict): | |
def _accept(self, v): | |
# avoid "exsiting but value is None or empty string" | |
return (v is not None and "{}".format(v).strip()) | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
for k, v in list(self.items()): | |
if not self._accept(v): | |
del self[k] | |
def __setitem__(self, k, v): | |
if self._accept(v): | |
super().__setitem__(k, v) | |
class _MediaDict(_NoNullDict): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
if "duration" in self and "duration_ms" not in self: | |
s, ms = self._durations("duration", self["duration"]) | |
if s is not None: | |
self["duration_ms"] = ms | |
else: | |
del self["duration"] | |
elif "duration_ms" in self and "duration" not in self: | |
s, ms = self._durations("duration_ms", self["duration_ms"]) | |
if s is not None: | |
self["duration"] = s | |
else: | |
del self["duration_ms"] | |
if "pubdate" in self or "date_created" in self: | |
pd = self.pop("pubdate", "") | |
dc = self.pop("date_created", "") | |
if dc: | |
self["date_created"] = dc | |
elif pd: | |
self["date_created"] = pd | |
def get_title(self, with_artist=False): | |
title = self.get("title", path_noext(self.get("filename", ""))) | |
if with_artist and "artist" in self: | |
return self.get("artist") + " - " + title | |
return title | |
def _durations(self, k, v): | |
if v and type(v) == type(""): | |
m = re.match(r"(\d+)s(.\d+)?", v) | |
if m: | |
v = v.replace("s", "") | |
if k == "duration_ms": | |
try: | |
ms = to_int(v) | |
s = float(ms / 1000) | |
return "{}".format(s), "{}".format(ms) | |
except ValueError: | |
# maybe "indefinite" | |
return None, None | |
else: | |
try: | |
s = parse_time(v) | |
ms = int(s * 1000) | |
return "{}".format(s), "{}".format(ms) | |
except ValueError: | |
# maybe "indefinite" | |
return None, None | |
def _fromdate(self, v): | |
m = re.match(r"\d+(?:\.\d+)?$", v) # posix timestamp | |
if m: | |
dt = datetime.utcfromtimestamp(to_int(v)) | |
dt = dt.replace(tzinfo=dateutil.tz.tzutc()) | |
else: | |
try: | |
dt = dateutil.parser.parse(v) | |
except dateutil.parser._parser.ParserError: | |
# Unknown string format: Fri, 16 Jul 2021 21:09:19 Europe/Dublin | |
t, _, z = v.strip().rpartition(" ") | |
dt = dateutil.parser.parse(t).replace( | |
tzinfo=dateutil.tz.gettz(z)) | |
# store as iso-8601 | |
return dt.strftime("%Y-%m-%dT%H:%M:%S%z") | |
def __setitem__(self, k, v): | |
if not self._accept(v): | |
return | |
if k in ("duration", "duration_ms"): | |
if v != "": | |
s, ms = self._durations(k, v) | |
super().__setitem__("duration", s) | |
super().__setitem__("duration_ms", ms) | |
elif k in ("date_created", "pubdate",): | |
super().__setitem__("date_created", self._fromdate(v)) | |
else: | |
super().__setitem__(k, v) | |
class W_BASE(object): | |
def _post_render(self, rendered): | |
return rendered | |
def write(self, fn, native_encoding=NATIVE_ENCODING, **tmpldata): | |
_, pltype = os.path.splitext(fn) | |
tmpl, enc = None, None | |
if hasattr(self, "template"): | |
tmpl = self.template | |
if hasattr(self, "encoding"): | |
enc = self.encoding | |
if enc == NATIVE_ENCODING and native_encoding != NATIVE_ENCODING: | |
enc = native_encoding | |
if not tmpl: | |
raise NotImplementedError("can't write as '{}'".format(pltype)) | |
if hasattr(self, "writer_warning"): | |
_log.warning(self.writer_warning) | |
tmpldata["playlist_encoding"] = enc | |
with io.open(fn, "w", encoding=enc) as fo: | |
fo.write(self._post_render(tmpl.render(**tmpldata))) | |
class CUSTOM_WRITER(W_BASE): | |
def __init__(self, tmpl, enc): | |
self.template = Template(filename=tmpl) | |
self.encoding = enc | |
class R_BASE(object): | |
def __init__(self, fn, cont, mandbase="", **options): | |
if "readcontent_converter" in options: | |
convert = options["readcontent_converter"].convert | |
cont = convert(cont) | |
self._cont = cont | |
self._mandbase = mandbase | |
self._options = dict(options) | |
def playlist_meta(self): | |
return dict() | |
def filename_conv(self, fn, base=None): | |
res = _prep_normpath(fn.rstrip(), **self._options) | |
if base: | |
res = path_join2(base, res) | |
res = path_join2(self._mandbase, res) | |
if not path_split(fn)[0]: | |
_log.debug("%r -> %r", fn, res) | |
return res | |
class BSHTML_READER(object): | |
r""" | |
This reader is not chosen by an input-based decision and is only | |
used by explicit instantiation. | |
ex.) | |
$ python3 mpplgen.py \ | |
--from_l="http://www.radiofeeds.co.uk/mp3.asp" \ | |
--li=BSHTML_READER \ | |
--additional_reader_params='{ | |
"bsreader_findcriterias": [{ | |
"name": "td", | |
"attrs": {"width": "68%"} | |
}, { | |
"name": "a", | |
"attrs": {"target": null} | |
}], | |
"bsreader_fngetter": "attrs, href", | |
"bsreader_fnpattern": "https?://.*"}' \ | |
--exec_p=vlc | |
$ wget "http://openmusicarchive.org/browse_tag.php?tag=female%20vocal" \ | |
-O oma.html | |
$ python3 mpplgen.py \ | |
--from_l=oma.html \ | |
--li=BSHTML_READER \ | |
--additional_reader_params='{ | |
"bsreader_bs4_features": "lxml", | |
"bsreader_findcriterias": [{ | |
"name": "a", | |
"text": "Download MP3", | |
"attrs": {"class": "link", "target": "_blank"} | |
}, | |
"bsreader_fngetter": "attrs, href", | |
"bsreader_fnpattern": ".*\\.mp3", | |
"bsreader_base": "http://openmusicarchive.org" | |
}]' | |
$ wget "https://myanimelist.net/anime/9253/Steins_Gate/video" \ | |
-O oma.html | |
$ python3 mpplgen.py \ | |
--from_l=oma.html \ | |
--li=BSHTML_READER \ | |
--additional_reader_params='{ | |
"bsreader_findcriterias": [{ | |
"name": "div", | |
"attrs": {"class": "video-list-outer po-r pv"} | |
}, { | |
"name": "a", | |
"attrs": {"class": "iframe js-fancybox-video video-list di-ib po-r"} | |
}], | |
"bsreader_fngetter": "attrs, href"}' | |
Python's urlretrieve and wget may not work because the server may limit | |
returns depending on the type of client. In such a case, use the "View | |
Source" of the web browser to manually obtain the html source. Or, | |
you may want to use browser's headless mode like: | |
$ "/c/Program Files (x86)/Google/Chrome/Application/chrome" --headless \ | |
--disable-gpu \ | |
--dump-dom \ | |
"http://openmusicarchive.org/browse_tag.php?tag=New%20York" \ | |
> browse_tag.html | |
Then you can: | |
$ fma_in="${1:-https://freemusicarchive.org/genre/Blues}" | |
$ fma_out=/tmp/tmp$$ | |
$ "/c/Program Files (x86)/Google/Chrome/Application/chrome" --headless \ | |
--disable-gpu \ | |
--dump-dom "${fma_in}" > "${fma_out}" | |
$ python3 mpplgen.py \ | |
--from_l="${fma_out}" \ | |
--li=BSHTML_READER \ | |
--additional_reader_params='{ | |
"bsreader_findcriterias": [{ | |
"name": "div", | |
"attrs": {"class": "play-item"} | |
}], | |
"bsreader_fngetter": "json_in_attrs, data-track-info, fileUrl", | |
"bsreader_base": "https://freemusicarchive.org"}' \ | |
--exec_p=mpc-hc64 | |
""" | |
class R(R_BASE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
try: | |
import bs4 | |
except ImportError: | |
raise ValueError( | |
"You must install beautifulsoup (bs4) " + | |
"if you want to use BSHTML_READER.") | |
bs4_features = options.get("bsreader_bs4_features", "html.parser") | |
self._soup = bs4.BeautifulSoup(self._cont, features=bs4_features) | |
if not options.get("bsreader_findcriterias"): | |
raise ValueError( | |
"bsreader_findcriterias is not specified. " + | |
"you must specify it via --additional_reader_params.") | |
self._findcriterias = options.get("bsreader_findcriterias") | |
self._fngetter = re.split( | |
r"\s*,\s*", options.get("bsreader_fngetter", "attrs, href")) | |
self._fnpattern = re.compile( | |
options.get("bsreader_fnpattern", r".*")) | |
self._fnrejpattern = None | |
if options.get("bsreader_fnrejpattern"): | |
self._fnrejpattern = re.compile( | |
options.get("bsreader_fnrejpattern")) | |
self._base = options.get("bsreader_base", mandbase) | |
self._fnconv = options.get("bsreader_fnconv") | |
def playlist_meta(self): | |
title = self._base | |
te = self._soup.find("title") | |
if te: | |
title = te.text.strip() | |
return _NoNullDict( | |
playlist_title=title) | |
def medias(self): | |
def _dive(el, depth, criterias): | |
if depth < len(criterias): | |
crt = criterias[depth] | |
for ch in el.find_all(**crt): | |
for media in _dive(ch, depth + 1, criterias): | |
yield media | |
else: | |
if self._fngetter[0] == "attrs": | |
fn = el.attrs.get(self._fngetter[1]) | |
elif self._fngetter[0] == "json_in_attrs": | |
if len(self._fngetter) < 3: | |
raise ValueError( | |
"require two keys, for attrs and for json") | |
jo = json.loads(el.attrs.get(self._fngetter[1])) | |
fn = jo.get(self._fngetter[2], "") | |
elif self._fngetter[0] == "text": | |
fn = el.text.strip() | |
if self._fnconv: | |
fn = eval(self._fnconv) | |
if not fn or not self._fnpattern.match(fn): | |
return | |
if self._fnrejpattern and self._fnrejpattern.match(fn): | |
return | |
fn = self.filename_conv(fn, self._base) | |
if not fn: | |
return | |
yield _MediaDict(filename=fn) | |
if isinstance(self._findcriterias[0], (list,)): | |
criterias_list = self._findcriterias | |
else: | |
criterias_list = [self._findcriterias] | |
for criterias in criterias_list: | |
for media in _dive(self._soup, 0, criterias): | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn).read() | |
return BSHTML_READER.R(fn, cont, mandbase, **options) | |
class R_BASE_ETREE(R_BASE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
try: | |
parsed = ET_fromstring(self._cont) | |
except ElementTree.ParseError as eo: | |
try: | |
from lxml import etree | |
_log.warning( | |
"Detected something is wrong in your %r " + | |
"with the xml content.\n" + | |
" %r\n" + | |
"You have lxml, so let's recover it with lxml.", fn, eo) | |
parser = etree.XMLParser(recover=True) | |
parsed = ET_fromstring(self._cont, parser) | |
except ImportError: | |
raise eo | |
self._root = ETWrapper(parsed) | |
from xml.etree.ElementTree import _namespaces | |
self._ns = { | |
v: k | |
for k, v in _namespaces(self._root)[1].items()} | |
class R_BASE_ATOM(R_BASE_ETREE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
self._ns_prefs = dict(atom="", media="") | |
for n, u in self._ns.items(): | |
pref = "{}:".format(n) | |
if "/mrss" in u: | |
self._ns_prefs["media"] = pref | |
elif "/Atom" in u: | |
self._ns_prefs["atom"] = pref | |
elif re.match(r".*/rss/[\d.]+/?$", u): | |
self._ns_prefs["rss"] = pref | |
elif re.match(r".*/rss/[\d.]+/.*/content/?", u): | |
self._ns_prefs["content"] = pref | |
elif "/dc/" in u: | |
self._ns_prefs["dc"] = pref | |
elif "-rdf-" in u: | |
self._ns_prefs["rdf"] = pref | |
def _xpath(self, tmpl): | |
d = self._ns_prefs.copy() | |
while True: | |
try: | |
return tmpl.format(**d) | |
except KeyError as e: | |
if not e.args: | |
raise | |
d[e.args[0]] = "" | |
def _fromelm(self, el, baseinf): | |
fn = el.attrib.get("url") | |
if not fn: | |
fn = el.attrib.get("href") | |
if fn: | |
# # podcastgarden uses "length" as "duration [ms]"... | |
# # i did understood it as "content-length" by reading | |
# # rfc... | |
# if "length" in el.attrib: | |
# file_size = el.attrib["length"] | |
# else: | |
# file_size = el.attrib.get("fileSize") | |
media = _MediaDict( | |
filename=self.filename_conv(fn), | |
# file_size=file_size | |
) | |
media.update(**baseinf) # title, pubdate, description, ... | |
media["bitrate"] = el.attrib.get("bitrate") | |
media["mimetype"] = el.attrib.get("type") | |
return media | |
def _from_mrssitems(self, ite, baseinf, dejav): | |
els = list(ite.findall( | |
self._xpath("{media}content"), self._ns)) | |
if els: | |
for el in els: | |
media = self._fromelm(el, baseinf) | |
if media and media["filename"] not in dejav: | |
dejav.add(media["filename"]) | |
yield media | |
gels = ite.findall(self._xpath("{media}group"), self._ns) | |
if not gels: | |
return | |
bi = baseinf.copy() | |
for g in gels: | |
conts = [] | |
for mel in list(g): | |
if mel.tag.endswith("}title"): | |
bi["title"] = mel.text_stripped | |
elif mel.tag.endswith("}description"): | |
bi["description"] = mel.text_stripped | |
elif mel.tag.endswith("}content"): | |
conts.append(mel) | |
for celm in conts: | |
media = self._fromelm(celm, bi) | |
if media and media["filename"] not in dejav: | |
dejav.add(media["filename"]) | |
yield media | |
def playlist_meta(self): | |
res = _NoNullDict() | |
telm = self._root.find("./channel/title") | |
if not telm: | |
telm = self._root.find(self._xpath("{atom}title"), self._ns) | |
if not telm: | |
telm = self._root.find( | |
self._xpath("{rss}channel/{rss}title"), self._ns) | |
res["playlist_title"] = telm.text_stripped | |
description = self._root.find( | |
self._xpath("{rss}channel/{rss}description"), | |
self._ns).text_stripped | |
if description: | |
res["playlist_description"] = description | |
return res | |
class TXT(W_BASE): | |
"""Simple filelist""" | |
encoding = "utf-8" | |
template = Template(dedent("""\ | |
% for i, media in enumerate(medias): | |
${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)} | |
% endfor | |
""")) | |
class R(R_BASE): | |
def medias(self): | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
r = self.filename_conv(line) | |
if not r: | |
continue | |
yield _MediaDict(filename=r) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn).read() | |
return TXT.R(fn, cont, mandbase, **options) | |
class RAM(TXT): | |
"""Real Audio Metadata (RAM)""" | |
class R(TXT.R): | |
def __init__(self, fn, cont, mandbase="", **options): | |
cont = re.sub(r"\s*#[^\n]*(\r?\n)+", r"\n", cont) | |
super().__init__(fn, cont, mandbase, **options) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn).read() | |
return RAM.R(fn, cont, mandbase, **options) | |
class REGEXP_READER(object): | |
""" | |
This reader is not chosen by an input-based decision and is only | |
used by explicit instantiation. | |
$ python3 mpplgen.py \ | |
--from_l=pl.smp \ | |
--li=REGEXP_READER \ | |
--additional_reader_params='{ | |
"regexpreader_regexp_playlist_title": "//Title (.*) Title", | |
"regexpreader_regexp_mediaitem_filename": "^File=(.*)$", | |
"regexpreader_regexp_mediaitem_title": "^Title=(.*)$", | |
"regexpreader_regexp_mediaitem_duration": "^Time=(.*)$", | |
"regexpreader_mediaitem_lastkey": "duration" | |
}' | |
""" | |
class R(R_BASE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
self._regexes_playlist = {} | |
self._regexes_media = {} | |
self._mediaitem_lastkey = options.get( | |
"regexpreader_mediaitem_lastkey", "filename") | |
for k in options.keys(): | |
m1 = re.match( | |
"regexpreader_regexp_mediaitem_([a-z]+)", k) | |
m2 = re.match( | |
"regexpreader_regexp_(playlist_[a-z]+)", k) | |
if m1: | |
t = self._regexes_media | |
key = m1.group(1) | |
elif m2: | |
t = self._regexes_playlist | |
key = m2.group(1) | |
else: | |
continue | |
regexp = options[k] | |
t[key] = re.compile(regexp) | |
def playlist_meta(self): | |
result = _NoNullDict() | |
for k, rgx in self._regexes_playlist.items(): | |
found = rgx.search(self._cont) | |
if found: | |
result[k] = found.group(1) | |
return result | |
def medias(self): | |
media = _MediaDict() | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
for k, rgx in self._regexes_media.items(): | |
m = rgx.match(line) | |
if m: | |
v = m.group(1) | |
if k == "filename": | |
v = self.filename_conv(v) | |
media[k] = v | |
if k == self._mediaitem_lastkey: | |
yield media | |
media = _MediaDict() | |
break | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn).read() | |
return REGEXP_READER.R(fn, cont, mandbase, **options) | |
class CSV(object): | |
""" | |
General csv. | |
""" | |
class R(R_BASE): | |
def medias(self): | |
# | |
keymap = self._options.get("csvreader_keymap", {}) | |
delim = self._options["csvreader_delimiter"] | |
# | |
reader = csv.reader( | |
re.split(r"[ \t]*\r?\n", self._cont), delimiter=delim) | |
keys = next(reader) | |
keys = [keymap.get(k, k) for k in keys] | |
if "filename" not in keys: | |
raise ValueError("filename field is required in csv") | |
for row in reader: | |
d = dict(zip(keys, row)) | |
d["filename"] = self.filename_conv(d["filename"]) | |
yield _MediaDict(d) | |
def reader(self, fn, mandbase="", **options): | |
options["csvreader_delimiter"] = "," | |
cont = open_textfile(fn).read() | |
return CSV.R(fn, cont, mandbase, **options) | |
class TSV(CSV): | |
""" | |
General tsv. | |
""" | |
def reader(self, fn, mandbase="", **options): | |
options["csvreader_delimiter"] = "\t" | |
cont = open_textfile(fn).read() | |
return CSV.R(fn, cont, mandbase, **options) | |
class AUDPL(W_BASE): | |
"""Audacious format.""" | |
encoding = None | |
template = None | |
class R(R_BASE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
self._sections = [[], []] | |
idx = 0 | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
if not line: | |
continue | |
if not line.startswith("title="): | |
idx = 1 | |
self._sections[idx].append(line) | |
def playlist_meta(self): | |
res = _NoNullDict() | |
for line in self._sections[0]: | |
m = re.match(r"title=(.*)", line) | |
if m: | |
res["playlist_title"] = urlunquote(m.group(1)) | |
return res | |
def medias(self): | |
media = _MediaDict() | |
for line in self._sections[1]: | |
m = re.match(r"([^=]+)=(.*)", line) | |
if not m: | |
continue | |
k, v = m.group(1, 2) | |
k = { | |
"uri": "filename", | |
"track-number": "tracknumber", | |
"length": "duration_ms", | |
}.get(k, k) | |
if k == "filename": | |
v = urlunquote(v) | |
v = self.filename_conv(v) | |
else: | |
v = urlunquote(v) | |
if "filename" in media and k == "filename": | |
yield media | |
media = _MediaDict() | |
media[k] = v | |
if "filename" in media: | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["uri="]).read() | |
return AUDPL.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["uri="]).read() | |
return "uri=" in cont | |
class OBJECT(object): | |
""" | |
JSON. | |
""" | |
encoding = None | |
template = None | |
class R(R_BASE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
self._obj = eval(options["loader"]) | |
def medias(self): | |
# | |
keymap = self._options.get("objreader_keymap", {}) | |
for row in self._obj: | |
d = { | |
keymap.get(k, k): v | |
for k, v in row.items() | |
} | |
if "filename" not in d: | |
raise ValueError("filename field is required") | |
d["filename"] = self.filename_conv(d["filename"]) | |
yield _MediaDict(d) | |
def reader(self, fn, mandbase="", **options): | |
_, ext = os.path.splitext(fn) | |
if ext.lower() == ".json": | |
options["loader"] = "json.loads(cont)" | |
# elif ext.lower() == ".yaml": | |
# options["loader"] = "yaml.full_load_all(io.StringIO(cont))" | |
else: | |
raise ValueError("{} is not supported".format(ext)) | |
cont = open_textfile(fn).read() | |
return OBJECT.R(fn, cont, mandbase, **options) | |
reader_warning = _wrap_longwarn("""\ | |
This reader is so limited that it's probably far from what you want. | |
If you are not satisfied with the result, consider using | |
--from_generator_module. | |
""") | |
class FFCONCAT(W_BASE): | |
"""Virtual concatenation script demuxer of ffmpeg.""" | |
encoding = "utf-8" | |
template = Template(dedent("""\ | |
<%!import pipes%>\\ | |
# Generated by: ${__MYNAME__} ${__VERSION__} | |
% for i, media in enumerate(medias): | |
file ${pipes.quote(normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options))} | |
% endfor | |
""")) | |
class R(R_BASE): | |
def medias(self): | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
m = re.match(r"file\s+(.*)", line.rstrip()) | |
if not m: | |
continue | |
s = m.group(1) | |
if s[0] in ("'", '"'): | |
filename = eval(s) | |
else: | |
filename = s | |
if filename: | |
yield _MediaDict( | |
filename=self.filename_conv(filename)) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["file"]).read() | |
return FFCONCAT.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["file"]).read() | |
return re.search(r"^file\s+", cont, flags=re.M) | |
class PLP(W_BASE): | |
r""" | |
Sandisk Sansa playlist | |
The pla file extension is associated with the Sandisk Sansa mp3 | |
players. This type of file is no longer actively used and is most likely | |
obsolete. | |
This playlist format is tightly coupled to the Sandisk device's folder | |
structure and is not intended for use on other devices (eg Windows system | |
hard disks). For example, "HARP, MUSIC/" means that its media is stored | |
in the Sandisk-managed MUSIC folder, that is, "MUSIC" is not a | |
"magic number" but a simply location in Sandisk. However, some playlist | |
manager writes "C:\Windows\Media\Alarm01.wav" as | |
"HARP, MUSICC:\Windows\Media\Alarm01.wav", for example. We cannot | |
automatically correct this error. | |
The extension of this is a bit confusing, originally it seems to be ".pla", | |
but some software considers it to be ".plp". | |
""" | |
encoding = None | |
template = None | |
class R(R_BASE): | |
def medias(self): | |
vacnt = 0 | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
if line == "PLP PLAYLIST": | |
vacnt += 1 | |
continue | |
elif re.match(r"VERSION \d\.\d+", line): | |
vacnt += 1 | |
continue | |
elif not line: | |
continue | |
if vacnt != 2 or "," not in line: | |
raise ValueError("Malformed PLP playlist") | |
dev, _, path = list( | |
map(lambda s: s.strip(), line.partition(","))) | |
# path will be "MUSICc:\Windows\Media", but we won't fix it. | |
# if re.match(r"[A-Za-z][A-Za-z]+:.*", path): | |
_log.warning( | |
"You will have to fix path pointed " + | |
"Sandisk location %r", | |
path) | |
r = self.filename_conv(path.strip()) | |
if not r: | |
continue | |
yield _MediaDict(filename=r) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["PLP"]).read() | |
return PLP.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["PLP"]).read() | |
return "PLP PLAYLIST" in cont | |
SMILFAMILY_COMMON_TEMPLATE = """\ | |
<%page expression_filter="x"/>\\ | |
<smil> | |
<head> | |
<meta name="Generator" content="${__MYNAME__} -- ${__VERSION__}"/> | |
<meta name="ItemCount" content="${len(medias)}"/> | |
<title>${playlist_title}</title> | |
</head> | |
<body> | |
<seq> | |
% for i, media in enumerate(medias): | |
<__MEDIAELEMTAG__ src="${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)}" \\ | |
% if playlist_ext == ".zpl" and "{}".format(media.get("duration_ms", "")): | |
duration="${media['duration_ms']}" \\ | |
% endif | |
/> | |
% endfor | |
</seq> | |
</body> | |
</smil> | |
""" | |
class SMIL(W_BASE): | |
""" | |
Playlist feature part of 'Synchronized Multimedia Integration Language'. | |
The smil-style playlists that each player expects seem to vary. | |
We prefer to treat it as read-only rather than trying to output it. | |
""" | |
# encoding = "utf-8" | |
# template = Template( | |
# '<?xml version="1.0"?>\n' + SMILFAMILY_COMMON_TEMPLATE.replace( | |
# "__MEDIAELEMTAG__", "ref")) | |
encoding = None | |
template = None | |
class R(R_BASE_ETREE): | |
def playlist_meta(self): | |
return _NoNullDict( | |
playlist_title=self._root.find( | |
"./head/title").text_stripped) | |
def medias(self): | |
base = "" | |
melems = self._root.findall("./head/meta") | |
for melem in melems: | |
base = melem.attrib.get("base", base) | |
cntntags = ("seq", "par", "switch") | |
meditags = ("media", "ref", "audio", "video", "img") | |
def _media(elem): | |
src = elem.attrib.get("src") | |
if src: | |
src = self.filename_conv(src, base) | |
r = _MediaDict(filename=src) | |
if "duration" in elem.attrib: | |
r["duration_ms"] = elem.attrib.get("duration") | |
elif "dur" in elem.attrib: | |
r["duration"] = elem.attrib.get("dur") | |
r["bitrate"] = elem.attrib.get("system-bitrate") | |
return r | |
def _dive(contelem): | |
for el in list(contelem): | |
if el.tag in cntntags: | |
for media in _dive(el): | |
yield media | |
elif el.tag in meditags: | |
media = _media(el) | |
if media: | |
yield media | |
for el in list(self._root.find("./body")): | |
if el.tag in meditags: | |
media = _media(el) | |
if media: | |
yield media | |
elif el.tag in cntntags: | |
for media in _dive(el): | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return SMIL.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return not re.search(r"<?[wz]pl\s", cont) and \ | |
re.search(r"<smil\b", cont) and re.search(r"<ref\s", cont) | |
class WPL(SMIL): | |
"""Windows Media Player Playlist""" | |
encoding = "utf-8" | |
template = Template( | |
'<?wpl version="1.0"?>\n' + SMILFAMILY_COMMON_TEMPLATE.replace( | |
"__MEDIAELEMTAG__", "media")) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return WPL.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search(r"<?wpl\s", cont) | |
class ZPL(WPL): | |
""" | |
A format used by Zune Media Player, Zoom Player and Creative Zen Media | |
Players. (outdated players.) | |
""" | |
encoding = NATIVE_ENCODING | |
template = Template( | |
'<?zpl version="2.0"?>\n' + SMILFAMILY_COMMON_TEMPLATE.replace( | |
"__MEDIAELEMTAG__", "media")) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search(r"<?zpl\s", cont) | |
writer_warning = _wrap_longwarn("""\ | |
It is not recommended to output as ".zpl". | |
This format is simply outdated, it was used by Microsoft's Zune in | |
the past.""") | |
class B4S(W_BASE): | |
"""Winamp 3+ (outdated)""" | |
encoding = "utf-8" | |
template = Template(dedent('''\ | |
<%page expression_filter="x"/>\\ | |
<?xml version="1.0" encoding='utf-8' standalone="yes"?> | |
<WasabiXML> | |
<!-- Generated by: ${__MYNAME__} ${__VERSION__} --> | |
<playlist num_entries="${len(medias)}" label="${playlist_title}"> | |
% for i, media in enumerate(medias): | |
<% | |
filename = normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options) | |
%>\\ | |
<entry Playstring="${filename}" \\ | |
relative="${(not os.path.isabs(filename)) * 1}"> | |
<Name>${media.get("artist")} - ${media.get("title")}</Name> | |
% if "artist" in media: | |
<Artist>${media["artist"]}</Artist> | |
% endif | |
<Title>${media.get_title()}</Title> | |
% if "album" in media: | |
<Album>${media["album"]}</Album> | |
% endif | |
% if "{}".format(media.get("duration_ms", "")): | |
<Length>${media["duration_ms"]}</Length> | |
% endif | |
</entry> | |
% endfor | |
</playlist> | |
</WasabiXML> | |
''')) | |
writer_warning = _wrap_longwarn("""\ | |
It is not recommended to output as ".b4s". | |
This format is simply outdated, it was used by winamp in | |
the past.""") | |
class R(R_BASE_ETREE): | |
def playlist_meta(self): | |
result = _NoNullDict() | |
v = self._root.find("./playlist").attrib.get("label", "") | |
result["playlist_title"] = v | |
return result | |
def medias(self): | |
for eelms in self._root.findall("./playlist/entry"): | |
src = eelms.attrib.get("Playstring") | |
if src: | |
r = _MediaDict(filename=self.filename_conv(src)) | |
keys = ( | |
("Artist", "artist"), | |
("Title", "title"), | |
("Album", "album"), | |
("Length", "duration_ms") | |
) | |
for fk, k in keys: | |
r[k] = eelms.find(fk).text_stripped | |
yield r | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return B4S.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search(r"<WasabiXML", cont) | |
class ASX(W_BASE): | |
"""Windows Media Player""" | |
encoding = NATIVE_ENCODING | |
template = Template(dedent('''\ | |
<%page expression_filter="x"/>\\ | |
<asx version = "3.0"> | |
<!-- Generated by: ${__MYNAME__} ${__VERSION__} --> | |
<param name="Encoding" value="${playlist_encoding}"/> | |
<title>${playlist_title}</title> | |
% for i, media in enumerate(medias): | |
<entry> | |
<title>${media.get_title()}</title> | |
% if "{}".format(media.get("duration", "")): | |
<duration value="${ts_to_tss(float(media["duration"]))}" /> | |
% endif | |
<ref href = "${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)}"/> | |
</entry> | |
% endfor | |
</asx> | |
''')) | |
writer_warning = _wrap_longwarn("""\ | |
It is not recommended to output as ".asx". | |
Windows Media Player is almost the only player that | |
can handle asx correctly as designed, and 'audacious' | |
in particular treats it in a fairly selfish, and a | |
completely non-portable way.""") | |
@staticmethod | |
def _readcont(fn): | |
fn = _urlretrieve(fn) | |
bcont = io.open(fn, "rb").read() | |
bcont = re.sub(br"[\r\n]+", b" ", bcont) | |
m1 = re.search( | |
br'''<param\s+name\s*=\s* | |
["']Encoding["']\s+value\s*=\s*["']([\w-]+)["']\s*/>''', | |
bcont, flags=re.I | re.X) | |
m2 = re.search( | |
br'''<\?xml\s+[^<>]* | |
encoding\s*=\s*["']([\w-]+)["']\s*\?>''', | |
bcont, flags=re.I | re.X) | |
enc = NATIVE_ENCODING | |
if m1: | |
enc = m1.group(1).decode() | |
elif m2: | |
enc = m2.group(1).decode() | |
return io.open(fn, encoding=enc).read() | |
class R(R_BASE_ETREE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
self._elems = list(self._root) | |
def playlist_meta(self): | |
result = _NoNullDict() | |
for e in self._elems: | |
if e.tag_lc == "title": | |
result["playlist_title"] = e.text_stripped | |
return result | |
def medias(self): | |
def _entries(elems): | |
for ee in elems: | |
if ee.tag_lc == "entry": | |
ent = _MediaDict() | |
for er in list(ee): | |
if er.tag_lc == "ref": | |
v = er.attrib_lc.get("href") | |
ent["filename"] = v | |
elif er.tag_lc == "base": | |
v = er.attrib_lc.get("href") | |
ent["base"] = v | |
elif er.tag_lc == "title": | |
ent["title"] = er.text_stripped | |
elif er.tag_lc == "duration": | |
ent["duration"] = er.attrib_lc.get("value") | |
base = self._baseg | |
if "base" in ent: | |
base = ent.pop("base") | |
if "filename" in ent: | |
ent["filename"] = self.filename_conv( | |
ent["filename"], base) | |
yield ent | |
elif ee.tag_lc == "entryref": | |
v = ee.attrib_lc.get("href") | |
if v: | |
mb = os.path.dirname(v) | |
r = ASX.R( | |
ASX._readcont(v), mb, **self._options) | |
for ent in r.medias(): | |
yield ent | |
elif ee.tag_lc == "repeat": | |
v = ee.attrib_lc.get("count") | |
cnt = None | |
if v is None: | |
cnt = 10 | |
_log.warning( | |
"Repeating infinitely is not supported.") | |
else: | |
try: | |
cnt = int(v) | |
if cnt == 0: | |
cnt = 1 | |
except ValueError: | |
_log.warning("Invalid count for <repeat>.") | |
if cnt is not None: | |
for i in range(cnt): | |
for ent in _entries(list(ee)): | |
yield ent | |
# | |
self._baseg = None | |
for ee in self._elems: | |
if ee.tag_lc == "base": | |
v = ee.attrib_lc.get("href") | |
if v: | |
self._baseg = v | |
for ent in _entries(self._elems): | |
yield ent | |
def reader(self, fn, mandbase="", **options): | |
if ASXINI.accept_as_reader(fn): | |
return ASXINI().reader(fn, mandbase, **options) | |
return ASX.R(fn, ASX._readcont(fn), mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
if ASXINI.accept_as_reader(fn): | |
return True | |
return re.search(r"<?asx\s", ASX._readcont(fn), flags=re.I) | |
class MPL(W_BASE): | |
"""JRiver Media Center Playlist""" | |
encoding = "utf-8" | |
template = Template(dedent('''\ | |
<%page expression_filter="x"/>\\ | |
<?xml version="1.0" encoding="UTF-8" standalone="yes" ?> | |
<!-- Generated by: ${__MYNAME__} ${__VERSION__} --> | |
<MPL Version="2.0" Title="${playlist_title}" PathSeparator="/"> | |
% for i, media in enumerate(medias): | |
<Item> | |
<Field Name="Filename">${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)}</Field><% | |
keys = [ | |
(fk, " ".join(list(map(lambda s: s.title(), fk.split("_"))))) | |
for fk in media.keys() if fk != "filename"] | |
acc = { | |
"Title": "Name", | |
"Artist": "Artist", | |
"Album": "Album", | |
"Genre": "Genre", | |
"Year": "Date (readable)", | |
"Bitrate": "Bitrate", | |
"Duration": "Duration", | |
"Media Type": "Media Type", | |
"File Type": "File Type", | |
"File Size": "File Size", | |
"Sample Rate": "Sample Rate", | |
"Channels": "Channels", | |
"Bitspersample": "Bit Depth", | |
"Tracknumber": "Track #", | |
"Date Created": "Date Created", | |
} | |
keys = [(fk, acc.get(fkd)) for (fk, fkd) in keys if fkd in acc] | |
%> | |
% for (fk, fkd) in keys: | |
% if "{}".format(media.get(fk, "")): | |
% if fkd == "Date Created": | |
<Field Name="${fkd}">${int(dateutil.parser.parse( | |
media[fk]).timestamp())}</Field> | |
% else: | |
<Field Name="${fkd}">${media[fk]}</Field> | |
% endif | |
% endif | |
% endfor | |
</Item> | |
% endfor | |
</MPL> | |
''')) | |
class R(R_BASE_ETREE): | |
def playlist_meta(self): | |
return _NoNullDict( | |
playlist_title=self._root.attrib.get("Title")) | |
def medias(self): | |
for ielem in self._root.findall("./Item"): | |
media = _MediaDict() | |
for felem in ielem.findall("Field"): | |
k = felem.attrib.get("Name") | |
if not k: | |
continue | |
v = felem.text_stripped | |
if not v: | |
continue | |
k = k.lower().replace(" ", "_") | |
# maybe | |
# <Field Name="Compression"> | |
# CBR (MPEG-1 Layer 3) | |
# </Field> | |
# means "codec_profile" in fpl plus human readable | |
# "codec" in fpl...? | |
k = { | |
"name": "title", | |
"track_#": "tracknumber", | |
"bit_depth": "bitspersample", | |
"date_(readable)": "year", | |
}.get(k, k) | |
if k == "filename": | |
v = self.filename_conv(v) | |
media[k] = v | |
if "filename" in media: | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return MPL.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search(r"<MPL ", cont) | |
class KPL(W_BASE): | |
""" | |
Kalliope PlayList | |
Apparently Kalliope is no longer an active project. The playlist format | |
is unique, it's a fake xml, not a well-formed one. We prefer to treat it | |
as read-only rather than trying to output it. | |
""" | |
class R(R_BASE_ETREE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
cont = re.sub(r"(</?)(\d+\b)", r"\1_ENTRY_\2", cont) | |
super().__init__(fn, cont, mandbase, **options) | |
def medias(self): | |
tmp = dict() | |
for el in list(self._root): | |
m = re.match(r"_ENTRY_(\d+)", el.tag) | |
if not m: | |
continue | |
n = int(m.group(1)) | |
fn = el.attrib.get("filename") | |
if not fn: | |
continue | |
fn = self.filename_conv(fn.strip()) | |
r = _MediaDict(filename=fn) | |
for k, v in el.find("tag").attrib.items(): | |
k = {"track": "tracknumber"}.get(k, k) | |
r[k] = v.strip() | |
tmp[n] = r | |
return (tmp[n] for n in sorted(tmp.keys())) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return KPL.R(fn, cont, mandbase, **options) | |
class M3U8(W_BASE): | |
"""Winamp""" | |
encoding = "utf-8" | |
template = Template(dedent("""\ | |
% if extended_m3u: | |
#EXTM3U | |
% if extended_m3u_nostd1: | |
#PLAYLIST:${playlist_title} | |
% endif | |
# Generated by: ${__MYNAME__} ${__VERSION__} | |
% endif | |
<% | |
curg = "" | |
%>\\ | |
% for i, media in enumerate(medias): | |
% if extended_m3u: | |
% if extended_m3u_nostd2 and curg != media.get("filegroup", ""): | |
#EXTGRP:${media["filegroup"]} | |
<% | |
curg = media["filegroup"] | |
%>\\ | |
% endif | |
% if extended_m3u_nostd1 and "album" in media: | |
#EXTALB:${media["album"]} | |
% endif | |
% if extended_m3u_nostd1 and "artist" in media: | |
#EXTART:${media["artist"]} | |
% endif | |
% if extended_m3u_nostd1 and "genre" in media: | |
#EXTGENRE:${media["genre"]} | |
% endif | |
% if extended_m3u_nostd1 and "file_size" in media: | |
#EXTBYT:${media["file_size"]} | |
% endif | |
% if extended_m3u_nostd3 and "date_created" in media: | |
#EXT-X-PROGRAM-DATE-TIME:${media["date_created"]} | |
% endif | |
#EXTINF:${to_int(media.get("duration", -1))},${media.get_title(True)} | |
% endif | |
${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)} | |
% endfor | |
""")) | |
class R(R_BASE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
self._lines = list( | |
map( | |
lambda s: s.rstrip(), | |
re.split(r"[ \t]*\r?\n", self._cont))) | |
def playlist_meta(self): | |
res = _NoNullDict() | |
for line in self._lines: | |
m = re.match(r"#PLAYLIST:(.*)", line) | |
if m: | |
res["playlist_title"] = m.group(1).strip() | |
return res | |
def medias(self): | |
media = _MediaDict() | |
fgr = "" | |
for line in self._lines: | |
# "#EXTM3U", "#EXTINF:..." are special comment. | |
if not line or line == "#EXTM3U": | |
continue | |
m = re.match(r"#EXT([A-Z-]+):(.*)", line) | |
if line.startswith("#EXTINF:"): | |
extinfstr = line[len("#EXTINF:"):] | |
pt1, _, pt2 = extinfstr.partition(",") | |
media["title"] = pt2.strip() | |
elif m: | |
g, v = m.group(1, 2) | |
g = g.lower() | |
k = { | |
"alb": "album", | |
"art": "artist", | |
"byt": "file_size", | |
"grp": "filegroup", | |
"-x-program-date-time": "date_created", | |
}.get(g, g) | |
if k == "filegroup": | |
fgr = v | |
else: | |
media[k] = v.strip() | |
elif line[0] != "#": | |
media["filegroup"] = fgr | |
fn = line.strip() | |
ext, isplaytarg = _is_playtarget(fn) | |
if isplaytarg: | |
media["filename"] = self.filename_conv(fn) | |
yield media | |
else: | |
r = reader( | |
fn, os.path.dirname(fn), | |
fallback_txt=False, | |
**self._options) | |
if r: | |
for minn in r.medias(): | |
yield minn | |
media = _MediaDict() | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["#EXTM3U", "#", "."]).read() | |
return M3U8.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["#EXTM3U", "#", "."]).read() | |
return re.search(r"^#EXTM3U\s*$", cont, flags=re.M) | |
class M3U(M3U8): | |
"""Winamp""" | |
encoding = NATIVE_ENCODING | |
class PLS(W_BASE): | |
"""Legacy Winini-file based playlist.""" | |
encoding = NATIVE_ENCODING | |
template = Template(dedent("""\ | |
[playlist] | |
% for i, media in enumerate(medias): | |
<% | |
fn = media["filename"] | |
title = media.get("title", path_noext(fn)) | |
%>\\ | |
File${i + 1}=${normpath( | |
fn, | |
playlist_outdir=playlist_outdir, **normpath_options)} | |
Title${i + 1}=${title} | |
% if "duration" in media: | |
Length${i + 1}=${"{:.3f}".format(float(media["duration"]))} | |
% endif | |
% endfor | |
NumberOfEntries=${len(medias)} | |
Version=2 | |
""")) | |
writer_warning = _wrap_longwarn("""\ | |
It is not recommended to output as ".pls". | |
This is because the assumed string encoding | |
differs depending on the player. | |
For example, MPC-HC can only read utf-8, but others don't. | |
(Many players assume that .pls is either written in the | |
default code page or in utf-8.) Also, the handling of comments | |
varies from player to player. Some players consider ";" to be | |
the start of a comment, while others treat ";File1=" as "File1=" | |
(note that they don't even care about "NumberOfEntries"). | |
The expansion of environment variables is also the cause of trouble. | |
It cannot be exchanged between Windows and Unix in the first place. | |
Many software can export this format as a playlist, but you | |
should never choose this format because of this situation. | |
""") | |
class R(R_BASE): | |
def medias(self): | |
cnfp = ConfigParser() | |
if hasattr(cnfp, "read_string"): | |
cnfp.read_string(self._cont) | |
else: | |
cnfp.readfp(io.BytesIO(self._cont)) | |
tmp = defaultdict(_MediaDict) | |
sn = [ | |
sn for sn in cnfp.sections() | |
if sn.lower() == "playlist"][0] | |
for name, value in cnfp.items(sn): | |
m = re.match(r"([a-z]+)(\d+)", name) | |
if m: | |
t, i = m.group(1, 2) | |
t = t.lower() | |
t = { | |
"file": "filename", | |
"length": "duration", | |
}.get(t, t) | |
n = value.strip() | |
if not n: | |
continue | |
if t == "filename": | |
n = os.path.expanduser(n) | |
n = os.path.expandvars(n) | |
n = self.filename_conv(n) | |
tmp[int(i)][t] = n | |
return (tmp[k] | |
for k in sorted(tmp.keys()) | |
if "filename" in tmp[k]) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["["]).read() | |
return PLS.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["["]).read() | |
return re.search( | |
r"^\[playlist\]\s*$", cont, flags=re.M | re.I) | |
class ASXINI(W_BASE): | |
""" | |
Previous Versions of Windows Media Metafiles (deprecated) | |
Even the latest Windows Media Player doesn't handle this format correctly. | |
It seems the latest Windows Media Player only reads the first item | |
("ref01"). Also, her judgment that it is "ref01" on the lhs is messed up, | |
for example, "; akjlfdkipejfreF01 = aaa.mp3" is correctly regarded as | |
"ref01=". | |
Some third parties can export this format as a playlist, but you should | |
never choose this format because of this situation. | |
""" | |
encoding = None | |
template = None | |
class R(R_BASE): | |
def medias(self): | |
tmp = defaultdict(_MediaDict) | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
m = re.match( | |
r"^.*(ref)(\d+)\s*=\s*(.*)", line.rstrip(), | |
flags=re.I | re.M) | |
if m: | |
t, i, n = m.group(1, 2, 3) | |
t = t.lower() | |
t = {"ref": "filename"}.get(t, t) | |
n = n.strip() | |
if not n: | |
continue | |
if t == "filename": | |
# n = os.path.expanduser(n) | |
# n = os.path.expandvars(n) | |
n = self.filename_conv(n) | |
tmp[int(i)][t] = n | |
return (tmp[k] | |
for k in sorted(tmp.keys()) | |
if "filename" in tmp[k]) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["["]).read() | |
return ASXINI.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
try: | |
cont = open_textfile(fn, ["[", "</"]).read() | |
return re.search( | |
r"^\[Reference\]\s*$", cont, flags=re.M | re.I) | |
except UnicodeError: | |
return False | |
class MPCPL(W_BASE): | |
"""MPC-HC playlist (or some variant such as MPC-BC)""" | |
encoding = "utf-8" | |
template = Template(dedent("""\ | |
MPCPLAYLIST | |
% for i, media in enumerate(medias): | |
${i + 1},type,0 | |
${i + 1},filename,${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)} | |
${i + 1},label,${media.get_title()} | |
% endfor | |
""")) | |
class R(R_BASE): | |
def medias(self): | |
tmp = defaultdict(_MediaDict) | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
m = re.match(r"(\d+),(.*),(.*)", line.rstrip()) | |
if m: | |
i, t, n = m.group(1, 2, 3) | |
t = t.lower() | |
t = {"label": "title"}.get(t, t) | |
n = n.strip() | |
if not n: | |
continue | |
if t == "filename": | |
n = self.filename_conv(n) | |
tmp[int(i)][t] = n | |
return (tmp[k] | |
for k in sorted(tmp.keys()) | |
if "filename" in tmp[k]) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["MPCPLAYLIST"]).read() | |
return MPCPL.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["MPCPLAYLIST"]).read() | |
return re.search( | |
r"^MPCPLAYLIST\s*$", cont, flags=re.M) | |
class DPL(W_BASE): | |
"""MPC-HC playlist (or some variant such as MPC-BC)""" | |
encoding = "utf-8-sig" | |
template = Template(dedent("""\ | |
DAUMPLAYLIST | |
topindex=0 | |
saveplaypos=0 | |
% for i, media in enumerate(medias): | |
${i + 1}*file*${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)} | |
% endfor | |
""")) | |
class R(R_BASE): | |
def medias(self): | |
tmp = defaultdict(_MediaDict) | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
m = re.match(r"(\d+)\*([a-z]*)\*(.*)", line.rstrip()) | |
if m: | |
i, t, n = m.group(1, 2, 3) | |
t = t.lower() | |
t = { | |
"file": "filename", | |
"duration2": "duration_ms", | |
}.get(t, t) | |
n = n.strip() | |
if not n: | |
continue | |
if t == "filename": | |
n = self.filename_conv(n) | |
tmp[int(i)][t] = n | |
return (tmp[k] | |
for k in sorted(tmp.keys()) | |
if "filename" in tmp[k]) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["DAUMPLAYLIST"]).read() | |
return DPL.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["DAUMPLAYLIST"]).read() | |
return re.search( | |
r"^DAUMPLAYLIST\s*$", cont, flags=re.M) | |
class XSPF(W_BASE): | |
"""XML Shareable Playlist.""" | |
encoding = "utf-8" | |
template = Template(dedent("""\ | |
<%page expression_filter="x"/>\\ | |
<?xml version="1.0" encoding="UTF-8"?> | |
<!-- Generated by: ${__MYNAME__} ${__VERSION__} --> | |
<playlist version="1" xmlns="http://xspf.org/ns/0/"> | |
<trackList> | |
% for i, media in enumerate(medias): | |
<track> | |
<title>${media.get_title()}</title> | |
% if "{}".format(media.get("tracknumber", "")): | |
<trackNum>${media["tracknumber"]}</trackNum> | |
% endif | |
<location>${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, | |
**normpath_options)}</location> | |
% if "{}".format(media.get("album", "")): | |
<album>${media["album"]}</album> | |
% endif | |
% for mk in ("genre", "year", "bitrate"): | |
% if "{}".format(media.get(mk, "")): | |
<meta rel="${mk}">${media[mk]}</meta> | |
% endif | |
% endfor | |
% if "{}".format(media.get("artist", "")): | |
<creator>${media["artist"]}</creator> | |
% endif | |
% if "{}".format(media.get("duration_ms", "")): | |
<duration>${media["duration_ms"]}</duration> | |
% endif | |
</track> | |
% endfor | |
</trackList> | |
</playlist> | |
""")) | |
class R(R_BASE_ETREE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
if "cl:" in cont and "xmlns:cl" not in cont: | |
# like a monky | |
cont = re.sub( | |
r"<playlist\b", | |
r"<playlist xmlns:cl='some' ", cont) | |
super().__init__(fn, cont, mandbase, **options) | |
def medias(self): | |
ns = self._ns | |
for trelem in self._root.findall( | |
"./ns0:trackList/ns0:track", ns): | |
v = trelem.find("ns0:location", ns).text_stripped | |
media = _MediaDict() | |
if v: | |
media["filename"] = self.filename_conv(v) | |
for k, st in ( | |
("title", "ns0:title"), | |
("artist", "ns0:creator"), | |
("album", "ns0:album"), | |
("duration_ms", "ns0:duration"), | |
("tracknumber", "ns0:trackNum"), | |
): | |
media[k] = trelem.find(st, ns).text_stripped | |
for mi in trelem.findall("ns0:meta", ns): | |
k = mi.attrib.get("rel") | |
if not k or k not in ("genre", "year", "bitrate"): | |
continue | |
media[k] = mi.text_stripped | |
if "filename" in media: | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return XSPF.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search(r"<playlist", cont) | |
class HYPETAPE(W_BASE): | |
"""Hypetape XML Playlist Format""" | |
encoding = None | |
template = None | |
class R(R_BASE_ETREE): | |
def medias(self): | |
for trelem in self._root.findall("./tracks/track"): | |
fn = trelem.attrib.get("mp3") | |
if fn: | |
yield _MediaDict(filename=self.filename_conv(fn.strip())) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return HYPETAPE.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search(r"<?xml", cont) and \ | |
re.search(r"<playlist", cont) and \ | |
re.search(r"\smp3\s*=", cont) | |
class RSS(W_BASE): | |
""" | |
RSS Document, for example https://podcasts.files.bbci.co.uk/p02nq0gn.rss, | |
https://feeds.simplecast.com/54nAGcIl, | |
https://feeds.npr.org/510318/podcast.xml, | |
https://www.pbs.org/newshour/feeds/rss/podcasts/show, and so on. | |
(see https://blog.feedspot.com/news_podcasts/) | |
""" | |
encoding = None | |
template = None | |
class R(R_BASE_ATOM): | |
def medias(self): | |
pick_mrss = self._options.get("rssreader_pick_mrss", False) | |
pick_enclosure = self._options.get( | |
"rssreader_pick_enclosure", True) | |
for ite in self._root.findall("./channel/item"): | |
dejav = set() | |
baseinf = dict( | |
title=ite.find("title").text_stripped, | |
date_created=ite.find("pubDate").text_stripped, | |
description=ite.find("description").text_stripped, | |
link=ite.find("link").text_stripped, | |
) | |
if pick_enclosure: | |
encl = ite.find("enclosure") | |
if encl: | |
media = self._fromelm(encl, baseinf) | |
yield media | |
dejav.add(media["filename"]) | |
if not pick_mrss: | |
continue | |
for media in self._from_mrssitems(ite, baseinf, dejav): | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return RSS.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search(r"<rss", cont) and \ | |
re.search(r"<channel", cont) | |
class ATOM(W_BASE): | |
""" | |
Atom Document, for example | |
https://www.youtube.com/feeds/videos.xml?channel_id=UCFyhPCVFsM_0D0YtoFFlkWw, | |
https://www.youtube.com/feeds/videos.xml?user=ANNnewsCH | |
""" | |
encoding = None | |
template = None | |
class R(R_BASE_ATOM): | |
def medias(self): | |
pick_mrss = self._options.get("rssreader_pick_mrss", False) | |
pick_enclosure = self._options.get( | |
"rssreader_pick_enclosure", True) | |
for ite in self._root.findall( | |
self._xpath("{atom}entry"), self._ns): | |
dejav = set() | |
baseinf = dict( | |
title=ite.find( | |
self._xpath("{atom}title"), | |
self._ns).text_stripped, | |
date_created=ite.find( | |
self._xpath("{atom}published"), | |
self._ns).text_stripped, | |
description=ite.find( | |
self._xpath("{atom}description"), | |
self._ns).text_stripped, | |
) | |
if pick_enclosure: | |
els = ite.findall(self._xpath("{atom}link"), self._ns) | |
for el in els: | |
if el.attrib.get("rel") not in ( | |
"enclosure", | |
"alternate", # really? | |
): | |
continue | |
media = self._fromelm(el, baseinf) | |
if media: | |
yield media | |
dejav.add(media["filename"]) | |
if not pick_mrss: | |
continue | |
for media in self._from_mrssitems( | |
ite, baseinf, dejav): | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return ATOM.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search(r"<feed", cont) | |
class RDF(W_BASE): | |
"""RDF Document""" | |
encoding = None | |
template = None | |
class R(R_BASE_ATOM): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
try: | |
import bs4 | |
self._bs4 = bs4 | |
except ImportError: | |
raise ValueError( | |
"You must install beautifulsoup (bs4) " + | |
"if you want to use RDF.") | |
def medias(self): | |
for ite in self._root.findall( | |
self._xpath("{rss}item"), self._ns): | |
title = ite.find( | |
self._xpath("{rss}title"), | |
self._ns).text_stripped | |
date_created = ite.find( | |
self._xpath("{dc}date"), | |
self._ns).text_stripped | |
# extract html5 media element in "content:encoded". | |
# i don't know this is normal or not, but i've found | |
# such one... | |
content = ite.find( | |
self._xpath("{content}encoded"), | |
self._ns).text_stripped | |
if not content: | |
continue | |
soup = self._bs4.BeautifulSoup( | |
content, features="html.parser") | |
mselm = soup.find(name="source") | |
if mselm: | |
fn = mselm.attrs.get("src") | |
r = self.filename_conv(fn) | |
if not r: | |
continue | |
yield _MediaDict( | |
filename=r, | |
title=title, | |
date_created=date_created) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</rdf"]).read() | |
return RDF.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</rdf"]).read() | |
return re.search(r"<rdf:RDF", cont) | |
class RMP(W_BASE): | |
"""Real Metadata Package (RMP)""" | |
encoding = None | |
template = None | |
class R(R_BASE_ETREE): | |
def medias(self): | |
def _expandpath(fmt, packageid, media): | |
# %f=filename, | |
# %fid=Track id, | |
# %lid=TrackList ID, | |
# %pid=package id | |
filename = media["filename"] | |
trackid = media.get("trackid", "") | |
tracklistid = media.get("tracklistid", "") | |
fmt = fmt.replace("%fid", trackid) | |
fmt = fmt.replace("%lid", tracklistid) | |
fmt = fmt.replace("%pid", packageid) # ?? | |
fn = fmt.replace("%f", filename) | |
media["filename"] = self.filename_conv(fn) | |
basefmt = "%f" | |
v = self._root.find("./SERVER/LOCATION").text_stripped | |
if v: | |
basefmt = v | |
media, tlid = None, "" | |
packageid = "" # ?? | |
for tlelm in list(self._root.find("./TRACKLIST")): | |
if tlelm.tag == "LISTID": | |
tlid = tlelm.text_stripped | |
elif tlelm.tag == "TRACK": | |
media = _MediaDict(tracklistid=tlid) | |
for el in list(tlelm): | |
k = el.tag_lc | |
k = { | |
"size": "file_size", | |
# "quality": "bitrate", # ?? | |
}.get(k, k) | |
media[k] = el.text_stripped | |
_expandpath(basefmt, packageid, media) | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return RMP.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search(r"<PACKAGE", cont) and \ | |
re.search(r"<TRACKLIST", cont) and \ | |
re.search(r"<TRACK", cont) | |
class XML_GENERAL(W_BASE): | |
""" | |
This reader is not chosen by an input-based decision and is only | |
used by explicit instantiation. | |
""" | |
encoding = None | |
template = None | |
class R(R_BASE_ETREE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
if not options.get("xmlreader_search_xpath"): | |
raise ValueError( | |
"xmlreader_search_xpath is not specified. " + | |
"you must specify it via --additional_reader_params.") | |
self._searchxpathes = options.get("xmlreader_search_xpath") | |
self._fngetter = re.split( | |
r"\s*,\s*", options.get("xmlreader_fngetter", "attrs, href")) | |
self._fnpattern = re.compile( | |
options.get("xmlreader_fnpattern", r".*")) | |
self._fnrejpattern = None | |
if options.get("xmlreader_fnrejpattern"): | |
self._fnrejpattern = re.compile( | |
options.get("xmlreader_fnrejpattern")) | |
self._base = options.get("xmlreader_base", mandbase) | |
self._fnconv = options.get("xmlreader_fnconv") | |
def medias(self): | |
def _dive(el, depth, xpathes): | |
if depth < len(xpathes): | |
xpath = xpathes[depth] | |
for ch in el.findall(xpath, self._ns): | |
for media in _dive(ch, depth + 1, xpathes): | |
yield media | |
else: | |
if self._fngetter[0] == "attrs": | |
fn = el.attrib.get(self._fngetter[1]) | |
elif self._fngetter[0] == "json_in_attrs": | |
if len(self._fngetter) < 3: | |
raise ValueError( | |
"require two keys, for attrs and for json") | |
jo = json.loads(el.attrib.get(self._fngetter[1])) | |
fn = jo.get(self._fngetter[2], "") | |
elif self._fngetter[0] == "text": | |
fn = el.text.strip() | |
if self._fnconv: | |
fn = eval(self._fnconv) | |
if not fn or not self._fnpattern.match(fn): | |
return | |
if self._fnrejpattern and self._fnrejpattern.match(fn): | |
return | |
r = self.filename_conv(fn, self._base) | |
if not r: | |
return | |
yield _MediaDict(filename=r) | |
if isinstance(self._searchxpathes[0], (list,)): | |
xpathes_list = self._searchxpathes | |
else: | |
xpathes_list = [self._searchxpathes] | |
for xpathes in xpathes_list: | |
for media in _dive(self._root, 0, xpathes): | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return XML_GENERAL.R(fn, cont, mandbase, **options) | |
class XML(W_BASE): | |
""" | |
General XML. | |
""" | |
encoding = None | |
template = None | |
class R(XML_GENERAL.R): | |
def __init__(self, fn, cont, mandbase="", **options): | |
if "xmlreader_search_xpath" not in options: | |
options["xmlreader_search_xpath"] = ["item/link"] | |
super().__init__(fn, cont, mandbase, **options) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["</"]).read() | |
return XML.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search(r"</", cont) | |
reader_warning = _wrap_longwarn("""\ | |
This reader is so limited that it's probably far from what you want. | |
If you are not satisfied with the result, consider using | |
--from_generator_module. Or, if the problem is that this reader is | |
selected based on the extension, you can specify the reader using | |
"--listfile_reading_format". For example, | |
"--listfile_reading_format=ATOM". | |
""") | |
AIMPPL3_KEYS = [ | |
"filegroup_num", | |
"filename", | |
"artist", | |
"album", | |
"genre", | |
"title", | |
"duration_ms", | |
"file_size", | |
"__unk2__", | |
"year", | |
"sample_rate", | |
"bitrate", | |
"channels", | |
"__unk3__", # 3+ | |
"__unk4__", # 3+ | |
] | |
class AIMPPL2(W_BASE): | |
"""playlist of AIMP 2 (.plc)""" | |
encoding = "utf-16" | |
template = Template(dedent("""\ | |
<${playlist_title}:-1> | |
% for i, media in enumerate(medias): | |
${media.get("filegroup_num", "1")}|${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, | |
**normpath_options)}|${"|".join([ | |
"{}".format(media.get(k, "")) for k in AIMPPL3_KEYS[2:-2]])} | |
% endfor | |
""")) # minimul writer | |
class R(R_BASE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
self._meta = _NoNullDict() | |
self._entlines = [] | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
s = line.strip() | |
m = re.match(r"<(.*):[\d.-]+>", s) | |
if m: | |
self._meta["playlist_title"] = m.group(1) | |
continue | |
m = re.match(r"(\d+)\|(.*)", s) | |
if not m: | |
continue | |
self._entlines.append(s) | |
def playlist_meta(self): | |
return self._meta | |
def medias(self): | |
reader = csv.reader(self._entlines, delimiter="|") | |
for record in reader: | |
r = _MediaDict(filename=self.filename_conv(record[1])) | |
r["filegroup_num"] = record[0] | |
for i in range(2, len(AIMPPL3_KEYS) - 2): | |
if i >= len(record): | |
break | |
k = AIMPPL3_KEYS[i] | |
# if k.startswith("_"): | |
# continue | |
r[k] = record[i].strip() | |
yield r | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["|", "<"]).read() | |
return AIMPPL2.R( | |
fn, cont, mandbase, **options) | |
class AIMPPL3(W_BASE): | |
"""playlist of AIMP 3""" | |
encoding = "utf-16" | |
template = Template(dedent("""\ | |
#Name:${playlist_title} | |
<% | |
curg = None | |
%>\\ | |
% for i, media in enumerate(medias): | |
% if curg != media.get("filegroup"): | |
#Group:${media.get("filegroup")}|${media.get("filegroup_num")} | |
% endif | |
#Track:${media.get("filegroup_num", "1")}|${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, | |
**normpath_options)}|${"|".join([ | |
"{}".format(media.get(k, "")) for k in AIMPPL3_KEYS[2:]])} | |
<% | |
curg = media.get("filegroup") | |
%>\\ | |
% endfor | |
""")) # minimul writer | |
class R(R_BASE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
self._meta = _NoNullDict() | |
mkm = { | |
"Name": "playlist_title", | |
} | |
self._entlines = [] | |
self._filegroups = {} | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
s = line.strip() | |
m = re.match(r"#([^:]+):(.*)", s) | |
if not m: | |
continue | |
k, v = m.group(1, 2) | |
if k == "Group": | |
gnam, _, gnum = v.rpartition("|") | |
self._filegroups[gnum] = gnam | |
elif k == "Track": | |
self._entlines.append(v) | |
else: | |
self._meta[mkm.get(k, k)] = v | |
def playlist_meta(self): | |
return self._meta | |
def medias(self): | |
reader = csv.reader(self._entlines, delimiter="|") | |
for record in reader: | |
r = _MediaDict(filename=self.filename_conv(record[1])) | |
r["filegroup_num"] = record[0] | |
gnam = self._filegroups.get(record[0]) | |
if gnam: | |
r["filegroup"] = gnam | |
for i in range(2, len(AIMPPL3_KEYS)): | |
if i >= len(record): | |
break | |
k = AIMPPL3_KEYS[i] | |
# if k.startswith("_"): | |
# continue | |
r[k] = record[i].strip() | |
yield r | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["#Track:"]).read() | |
return AIMPPL3.R( | |
fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["#Track:"]).read() | |
return re.search( | |
r"^Track:", | |
cont, flags=re.M) | |
AIMPPL4_KEYS = [ | |
"filename", | |
"title", | |
"artist", | |
"album", | |
"album_artist", | |
"genre", | |
"year", | |
"tracknumber", | |
"__unk3__", | |
"__unk4__", | |
"__unk5__", | |
"bitrate", # |160 | |
"channels", # |2 | |
"sample_rate", # |44100 | |
"duration_ms", # |147749 | |
"file_size", # |2959067 | |
"__unk9__", # |0 | |
"__unk10__", # |1 | |
"__unk11__", # |1 | |
"__unk12__", # | | |
"__unk13__", # | | |
"__unk14__", # | | |
"_bitspersample?", # |0 | |
"__unk16__", # | | |
"codec", # |MP3 | |
"__unk18__", # | | |
] | |
class AIMPPL4(W_BASE): | |
"""playlist of AIMP 4+""" | |
encoding = "utf-16" | |
template = Template(dedent("""\ | |
#-----SUMMARY-----# | |
Name=${playlist_title} | |
NameIsAutoSet=0 | |
ContentFiles=${len(medias)} | |
#-----CONTENT-----# | |
<% | |
curg = None | |
%>\\ | |
% for i, media in enumerate(medias): | |
% if curg != media.get("filegroup"): | |
-${media.get("filegroup")} | |
% endif | |
${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, | |
**normpath_options)}|${"|".join([ | |
"{}".format(media.get(k, "")) for k in AIMPPL4_KEYS[1:]])} | |
<% | |
curg = media.get("filegroup") | |
%>\\ | |
% endfor | |
""")) # minimul writer | |
class R(R_BASE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
self._sections = [[], [], []] | |
sidx = 0 | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
s = line.strip() | |
m = re.match("#-----([A-Z]+)-----#", s) | |
if m: | |
st = m.group(1) | |
if st == "SUMMARY": | |
pass | |
elif st == "SETTINGS": | |
sidx = 1 | |
elif st == "CONTENT": | |
sidx = 2 | |
continue | |
if s: | |
self._sections[sidx].append(s) | |
def playlist_meta(self): | |
result = _NoNullDict() | |
for line in self._sections[0]: | |
m = re.match(r"Name=(.*)", line) | |
if m: | |
result["playlist_title"] = m.group(1) | |
return result | |
def medias(self): | |
curg = "" | |
curgc = 1 | |
reader = csv.reader(self._sections[2], delimiter="|") | |
for record in reader: | |
path = record[0] | |
if path.startswith("-"): # group | |
if curg: | |
curgc += 1 | |
curg = path[1:] | |
continue | |
r = _MediaDict( | |
filename=self.filename_conv(path), | |
filegroup=curg, filegroup_num="{}".format(curgc)) | |
for i in range(1, len(AIMPPL4_KEYS)): | |
if i >= len(record): | |
break | |
k = AIMPPL4_KEYS[i] | |
# if k.startswith("_"): | |
# continue | |
r[k] = record[i].strip() | |
yield r | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["#---"]).read() | |
return AIMPPL4.R( | |
fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["#---"]).read() | |
return re.search( | |
r"^#-----CONTENT-----#\s*$", | |
cont, flags=re.M) | |
class FPL(W_BASE): | |
"""playlist of Foobar2000""" | |
encoding = None | |
template = None | |
class R(R_BASE): | |
def medias(self): | |
# download source from 'https://github.com/rr-/fpl_reader', | |
# and do executing "setup.py install". | |
from fpl_reader import read_playlist | |
for track in read_playlist(self._cont).tracks: | |
fn = self.filename_conv( | |
track.file_name.decode("utf-8")) | |
res = _MediaDict(filename=fn) | |
res["file_size"] = track.file_size | |
# res["file_time"] = track.file_time | |
res["duration"] = track.duration | |
# rpg_album, rpg_track, rpk_album, rpk_track | |
for k, v in track.primary_keys.items(): | |
kd = k.decode().replace(" ", "_").lower() | |
res[kd] = v.decode("utf-8") | |
for k, v in track.secondary_keys.items(): | |
kd = k.decode().replace(" ", "_").lower() | |
res[kd] = v.decode("utf-8") | |
# print(res) | |
yield res | |
def reader(self, fn, mandbase="", **options): | |
return FPL.R( | |
fn, io.open(fn, "rb").read(), | |
mandbase, **options) | |
class PLA(W_BASE): | |
""" | |
playlists for iriver T20/T50/S10/E100 | |
see https://phintsan.kapsi.fi/iriver-t50.html | |
""" | |
encoding = None | |
template = None | |
class R(R_BASE): | |
def medias(self): | |
cont = self._cont | |
n, = struct.unpack(">i", cont[:4]) | |
magic, = struct.unpack("14s", cont[4:18]) | |
if magic != b'iriver UMS PLA': | |
raise ValueError("not 'iriver UMS PLA'") | |
pos = 512 | |
for i in range(n): | |
# fidx points starting index of tail part of path, | |
fidx, = struct.unpack(">H", cont[pos:pos + 2]) | |
pos += 2 | |
b, = struct.unpack("510s", cont[pos:pos + 510]) | |
b = b[:b.index(b'\0\0')] | |
fn = self.filename_conv(b.decode("utf-16be")) | |
yield _MediaDict(filename=fn) | |
pos += 510 | |
def reader(self, fn, mandbase="", **options): | |
return PLA.R( | |
fn, io.open(fn, "rb").read(), | |
mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = io.open(fn, "rb").read() | |
return b'iriver UMS PLA' in cont | |
def write(self, fn, native_encoding=None, **tmpldata): | |
_log.warning(self.writer_warning) | |
# | |
magic = b'iriver UMS PLA' + b"\x00" * 14 + b'Quick List' | |
# | |
medias = tmpldata.get("medias") | |
playlist_outdir = tmpldata.get("playlist_outdir") | |
normpath_options = tmpldata.get("normpath_options") | |
with io.open(fn, "wb") as fo: | |
fo.write(struct.pack(">i", len(medias))) | |
fo.write(magic) | |
restlen = 512 - (4 + len(magic)) | |
fo.write(struct.pack("{}x".format(restlen))) | |
for media in medias: | |
p = normpath( | |
media["filename"], | |
playlist_outdir, **normpath_options) | |
fidx = 1 | |
if "/" in p: | |
fidx = p.rindex("/") + 2 | |
fo.write(struct.pack(">H", fidx)) | |
es = p.replace("/", "\\").encode("utf-16be") | |
fo.write(struct.pack("{}B".format(len(es)), *es)) | |
restlen = 512 - (2 + len(es)) | |
fo.write(struct.pack("{}x".format(restlen))) | |
writer_warning = _wrap_longwarn("""\ | |
It is not recommended to output as ".pla". Very few players can handle | |
this (of course except but iRiver portable player itself), and I have | |
never seen a player who could read it correctly. BTW, you will have to | |
specify 'keep' as 'pathmode' if what you need is to export to | |
iRiver portable player. | |
""") | |
class PPL(W_BASE): | |
"""Passion Audio Player""" | |
encoding = None | |
template = None | |
class R(R_BASE): | |
def medias(self): | |
cont = self._cont | |
pos = 0 | |
while pos < len(cont): | |
media = _MediaDict() | |
# filename | |
slen, = struct.unpack("<I", cont[pos:pos + 4]) | |
pos += 4 | |
s, = struct.unpack("%ds" % slen, cont[pos:pos + slen]) | |
media["filename"] = self.filename_conv(s.decode("utf-8")) | |
pos += slen | |
# artist, title, album, genre, year | |
for key in ("artist", "title", "album", "genre", "year"): | |
slen, = struct.unpack("<I", cont[pos:pos + 4]) | |
pos += 4 | |
s, = struct.unpack("%ds" % slen, cont[pos:pos + slen]) | |
media[key] = s.decode("utf-8") | |
pos += slen | |
# tracknumber, as integer. so, not a number format such as | |
# "2a" will be handled as number zero. | |
tracknumber, = struct.unpack("<I", cont[pos:pos + 4]) | |
media["tracknumber"] = "{}".format(tracknumber) | |
pos += 4 | |
# unknown 8 bytes | |
pos += 8 | |
# duration in seconds (int) | |
duration, = struct.unpack("<I", cont[pos:pos + 4]) | |
media["duration"] = duration | |
pos += 4 | |
# file_size | |
file_size, = struct.unpack("<Q", cont[pos:pos + 8]) | |
media["file_size"] = file_size | |
pos += 8 | |
# unknown 61 bytes | |
pos += 61 | |
# | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
return PPL.R( | |
fn, io.open(fn, "rb").read(), | |
mandbase, **options) | |
PLIST_KEYMAP = { | |
"Name": "name", | |
"Artist": "artist", | |
"Album": "album", | |
"Genre": "genre", | |
"Size": "file_size", | |
"Total Time": "duration_ms", | |
"Track Number": "tracknumber", | |
"Year": "year", | |
"Bit Rate": "bitrate", | |
"Sample Rate": "samplerate", | |
"Location": "filename", | |
} | |
class PLIST(W_BASE): | |
""" | |
The “property list” files used by Apple, primarily on macOS and iOS. | |
In terms of "playlist", it is used by iTunes. | |
""" | |
encoding = None | |
template = None | |
class R(R_BASE): | |
def __init__(self, fn, cont, mandbase="", **options): | |
super().__init__(fn, cont, mandbase, **options) | |
self._plist = plistlib.load(self._cont) | |
def medias(self): | |
tracks = self._plist["Tracks"] | |
playlists = self._plist["Playlists"] | |
for pl in playlists: | |
for it in pl["Playlist Items"]: | |
trkid = "{}".format(it["Track ID"]) | |
track = tracks[trkid] | |
res = _MediaDict() | |
for pk in track.keys(): | |
k = PLIST_KEYMAP.get(pk) | |
if k: | |
v = track[pk] | |
if k == "filename": | |
v = self.filename_conv(v) | |
res[k] = "{}".format(v) | |
yield res | |
def reader(self, fn, mandbase="", **options): | |
return PLIST.R( | |
fn, io.open(fn, "rb"), | |
mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["</"]).read() | |
return re.search( | |
r"PropertyList-[0-9.]+.dtd", | |
cont, flags=re.M) | |
class HTML(W_BASE): | |
""" | |
html5 writer for testing, and generic html reader. | |
'bs4' is required for reader. | |
""" | |
encoding = "utf-8" | |
template = Template(dedent('''\ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>${playlist_title}</title> | |
<meta charset="${playlist_encoding}"> | |
% if html5writer_css: | |
<link rel="stylesheet" type="text/css" href="${html5writer_css}" /> | |
% endif | |
</head> | |
<!-- Generated by: ${__MYNAME__} ${__VERSION__} --> | |
<body> | |
<h1>${playlist_title | h}</h1> | |
<p>${playlist_description}</p> | |
% for i, media in enumerate(medias): | |
<h2> | |
% if "link" in media: | |
<a href="${media["link"]}" target=_blank> | |
% endif | |
% if media.get("date_created"): | |
${media.get("date_created")} | | |
% endif | |
${media.get_title() | h} | |
% if "link" in media: | |
</a> | |
% endif | |
</h2> | |
% if "description" in media: | |
${media["description"]} | |
% endif | |
<% | |
src = normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options) | |
if "mimetype" in media: | |
type = media["mimetype"] | |
else: | |
pp = urllib_urlsplit(src).path | |
type, _ = mimetypes.guess_type(pp) | |
m = re.match( | |
r"(https://www.youtube.com/)(?:watch\\?v=|v/)([^?&]+)([\\?&].*)?", | |
src) | |
src2 = "" | |
if m: | |
src2 = m.group(1) + "embed/" + m.group(2) | |
pa = m.group(3) | |
if pa: | |
src2 += "?" + pa[1:] | |
%> | |
<p> | |
% if m: | |
<iframe src="${src2}" frameborder="0" allowfullscreen="1"></iframe> | |
% endif | |
% if not type: | |
<a href="${src}" target=_blank>${src}</a> | |
% elif type.startswith("video"): | |
<video controls><source src="${src}" type="${type}"></video> | |
% elif type.startswith("audio"): | |
<audio controls><source src="${src}" type="${type}"></audio> | |
% elif type.startswith("image"): | |
<img src="${src}" type="${type}"/> | |
% else: | |
<a href="${src}" target=_blank>${src}</a> | |
% endif | |
</p> | |
% endfor | |
</body> | |
</html> | |
''')) | |
class R(BSHTML_READER.R): | |
def __init__(self, fn, cont, mandbase="", **options): | |
options["bsreader_findcriterias"] = [ | |
[ | |
{ | |
"name": "audio" | |
}, | |
{ | |
"name": "source" | |
} | |
], | |
[ | |
{ | |
"name": "video" | |
}, | |
{ | |
"name": "source" | |
} | |
], | |
[ | |
{ | |
"name": "img" | |
} | |
], | |
] | |
options["bsreader_fngetter"] = "attrs, src" | |
super().__init__(fn, cont, mandbase, **options) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["<html"]).read() | |
return HTML.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["<html"]).read() | |
return "<html" in cont | |
reader_warning = _wrap_longwarn("""\ | |
This reader is so limited that it's probably far from what you want. | |
If you are not satisfied with the result, use BSHTML_READER explicitly | |
or consider using --from_generator_module. | |
""") | |
class BAFL(W_BASE): | |
""" | |
The target filelist of BurnAware's "MakeISO" program. | |
Strictly speaking this is not a "playlist", but BurnAware's user interface | |
lacks some kind of usability, so it is convenient to be able to generate | |
this list outside of BurnAware. BurnAware, in particular, is very difficult | |
to use because it makes a fundamental mistake in the file order control | |
design policy. | |
The created list file can be passed to the MakeISO program as follows: | |
$ MakeISO.exe udf aaa.bafl | |
$ MakeISO.exe udf aaa.bafl /path/to/out/mydiscimage.iso | |
$ MakeISO.exe udf aaa.bafl /path/to/out/mydiscimage.iso -x | |
""" | |
encoding = "utf-8" | |
template = Template(dedent("""\ | |
<%page expression_filter="x"/>\\ | |
<%def name="dumpfile(i, media)"> | |
<% | |
st = os.stat(media["filename"]) | |
%> | |
<file | |
name="${'{:03d}'.format(i + 1)}_${os.path.basename(media["filename"])}" | |
date="${datetime.fromtimestamp( | |
st.st_mtime).strftime('%Y/%m/%d %H:%M:%S')}" | |
parameter="" priority="0" hidden="0" | |
size="${st.st_size}" | |
source="${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)}" | |
/> | |
</%def> | |
<%def name="dumptr(tr)"> | |
% for dn, t in tr.items(): | |
% if not dn: # files | |
% for i, m in enumerate(t): | |
% if m: | |
${dumpfile(i, m)} | |
% endif | |
% endfor | |
% else: # directory | |
<dir name="${os.path.basename(dn)}" | |
date="" parameter="" priority="0" hidden="0" size="0"> | |
${dumptr(t)} | |
</dir> | |
% endif | |
% endfor | |
</%def> | |
<% | |
tit = playlist_title | |
if not tit: | |
tit = datetime.now().strftime('%H%M_%d%m%Y') | |
%> | |
<compilation name="${tit}"> | |
${dumptr(filegroup_tree(medias))} | |
</compilation> | |
""")) | |
def _post_render(self, rendered): | |
return re.sub(r"\r?\n", "", rendered) | |
class R(R_BASE_ETREE): | |
def playlist_meta(self): | |
return _NoNullDict( | |
playlist_title=self._root.attrib.get("name")) | |
def medias(self): | |
def _media(elem, d): | |
src = elem.attrib.get("source") | |
if src: | |
src = self.filename_conv(src) | |
r = _MediaDict(filename=src) | |
r["name"] = elem.attrib.get( | |
"name", path_noext(src)) | |
r["filegroup"] = d | |
return r | |
def _dive(contelem, d): | |
for el in list(contelem): | |
if el.tag == "dir": | |
dc = os.path.join( | |
d, el.attrib.get("name")).replace("\\", "/") | |
for media in _dive(el, dc): | |
yield media | |
elif el.tag == "file": | |
media = _media(el, d) | |
if media: | |
yield media | |
d = "" | |
for el in list(self._root): | |
if el.tag == "file": | |
media = _media(el, d) | |
if media: | |
yield media | |
elif el.tag == "dir": | |
d = el.attrib.get("name") | |
for media in _dive(el, d): | |
yield media | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["<compilation"]).read() | |
return BAFL.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["<compilation"]).read() | |
return "<compilation" in cont | |
class IRP(W_BASE): | |
""" | |
The project file of InfraRecorder. | |
This is not a "playlist", but is a InfraRecorder's project file. | |
You will have to specify "irp_projecttype" via "--additional_template_params" | |
if you want to use this as writer. | |
""" | |
encoding = "utf-16" | |
template = Template(dedent("""\ | |
<%page expression_filter="x"/>\\ | |
<%def name="filetime(media)">\\ | |
<% | |
mt = os.stat(media["filename"]).st_mtime + ( | |
datetime(1970, 1, 1) - datetime(1601, 1, 1)).total_seconds() | |
mt = int(mt * 10**7) | |
%>\\ | |
<FileTime>${mt}</FileTime>\\ | |
</%def>\\ | |
<% | |
normpath_options["pathmode"] = "abs" | |
fgs = set(filter(None, [m.get("filegroup", "") for m in medias])) | |
%>\\ | |
<?xml version="1.0" encoding="utf-16" standalone="yes"?> | |
<InfraRecorder> | |
% if not irp_projecttype: # audio | |
<Project version="3" type="1" media="12"> | |
<Audio> | |
% for i, media in enumerate(medias): | |
<File${i}> | |
<InternalName>${media.get_title()}</InternalName> | |
<FullPath>${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)}</FullPath> | |
</File${i}> | |
% endfor | |
</Audio> | |
% else: # video or data | |
% if irp_projecttype == 1: # video | |
<Project version="3" type="0" media="1"> | |
% else: | |
<Project version="3" type="0" media="4"> | |
% endif | |
<Data> | |
% for i, fg in enumerate(fgs): | |
<File${i} flags="1"> | |
<InternalName>${fg}</InternalName> | |
</File${i}> | |
% endfor | |
% for i, media in enumerate(medias): | |
<File${i + len(fgs)} flags="0"> | |
<InternalName>${os.path.join( | |
media.get("filegroup", ""), | |
media.get_title()).replace("\\\\", "/")}</InternalName> | |
<FullPath>${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)}</FullPath> | |
${filetime(media)} | |
</File${i + len(fgs)}> | |
% endfor | |
</Data> | |
% endif | |
</Project> | |
</InfraRecorder> | |
<!-- Generated by: ${__MYNAME__} ${__VERSION__} --> | |
""")) | |
class R(R_BASE_ETREE): | |
def playlist_meta(self): | |
return _NoNullDict( | |
playlist_title=self._root.find("Project/Label").text_stripped) | |
def medias(self): | |
delm = self._root.find("Project/Data") | |
if not delm: | |
delm = self._root.find("Project/Audio") | |
for felm in list(delm): | |
flags = felm.attrib.get("flags", "0") | |
if flags != "0": # it seems directory | |
continue | |
title = felm.find("InternalName").text_stripped | |
filegroup, title = os.path.split(title) | |
filename = self.filename_conv(felm.find("FullPath").text_stripped) | |
#felm.find("FileTime") | |
yield _MediaDict( | |
filename=filename, | |
filegroup=filegroup.replace("\\", "/"), | |
title=title) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["<InfraRecorder>"]).read() | |
return IRP.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["<InfraRecorder>"]).read() | |
return "</InfraRecorder>" in cont | |
class IBB(W_BASE): | |
""" | |
The project file of ImgBurn. | |
This is not a "playlist", but is a ImgBurn's project file. | |
""" | |
encoding = "utf-16" | |
template = Template(dedent("""\ | |
IBB | |
[START_BACKUP_LIST] | |
% for i, media in enumerate(medias): | |
${normpath( | |
media["filename"], | |
playlist_outdir=playlist_outdir, **normpath_options)} | |
% endfor | |
[END_BACKUP_LIST] | |
""")) | |
class R(R_BASE): | |
def medias(self): | |
fll = False | |
for line in re.split(r"[ \t]*\r?\n", self._cont): | |
if not fll and line == "[START_BACKUP_LIST]": | |
fll = True | |
elif fll: | |
if line == "[END_BACKUP_LIST]": | |
break | |
yield _MediaDict(filename=self.filename_conv(line)) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["[START_BACKUP_LIST]"]).read() | |
return IBB.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["[START_BACKUP_LIST]"]).read() | |
return cont.startswith("IBB") | |
class AXP(W_BASE): | |
""" | |
The audio compilation project file of CDBurnerXP. | |
This is not a "playlist", but is a CDBurnerXP's project file. | |
""" | |
encoding = "utf-8" | |
template = Template(dedent('''\ | |
<%page expression_filter="x"/>\\ | |
<% | |
normpath_options["pathmode"] = "abs" | |
if context.get("CDBurnerXP_version", UNDEFINED) is UNDEFINED: | |
CDBurnerXP_version = "4.5.8.7128" | |
now = datetime.now() | |
now_d = now.strftime("%Y/%m/%d") | |
now_t = now.strftime("%H:%M") | |
%>\\ | |
<?xml version="1.0" encoding="''' + encoding + '''" standalone="yes"?> | |
<!DOCTYPE layout PUBLIC "http://www.cdburnerxp.se/help/audio.dtd" ""> | |
<?xml-stylesheet type='text/xsl' href='http://www.cdburnerxp.se/help/compilation.xsl'?> | |
<!-- Generated by: ${__MYNAME__} ${__VERSION__} --> | |
<layout type="Audio" version="${CDBurnerXP_version}" date="${now_d}" time="${now_t}"> | |
<compilation name="audio-template" title="${playlist_title}" artist=""> | |
% for i, media in enumerate(medias): | |
<track path="${normpath( | |
media['filename'], | |
playlist_outdir=playlist_outdir, **normpath_options)}" \\ | |
title="${media.get_title()}" \\ | |
artist="${media.get("artist", "")}" \\ | |
pregap="0" postgap="0" \\ | |
number="${i + 1}" /> | |
% endfor | |
</compilation> | |
</layout> | |
''')) | |
class R(R_BASE_ETREE): | |
def playlist_meta(self): | |
return _NoNullDict( | |
playlist_title=self._root.find("compilation").attrib.get("title")) | |
def medias(self): | |
for el in self._root.findall("compilation/track"): | |
yield _MediaDict( | |
filename=self.filename_conv(el.attrib.get("path", "")), | |
title=el.attrib.get("title", ""), | |
artist=el.attrib.get("artist", ""), | |
) | |
def reader(self, fn, mandbase="", **options): | |
cont = open_textfile(fn, ["<compilation"]).read() | |
return AXP.R(fn, cont, mandbase, **options) | |
@staticmethod | |
def accept_as_reader(fn): | |
cont = open_textfile(fn, ["<compilation"]).read() | |
return re.search(r'''type\s*=\s*["']Audio["']''', cont) | |
class DXP(W_BASE): | |
""" | |
The data compilation project file of CDBurnerXP. | |
This is not a "playlist", but is a CDBurnerXP's project file. | |
""" | |
encoding = "utf-8" | |
template = Template(dedent('''\ | |
<%page expression_filter="x"/>\\ | |
<%def name="dumpfile(i, media)"> | |
<% | |
st = os.stat(media["filename"]) | |
%> | |
<file |