Skip to content

Instantly share code, notes, and snippets.

@rgaudin
Last active March 23, 2023 13:52
Show Gist options
  • Save rgaudin/326a808965a6efbce56cdf70c4695dd1 to your computer and use it in GitHub Desktop.
Save rgaudin/326a808965a6efbce56cdf70c4695dd1 to your computer and use it in GitHub Desktop.
Python implementation if zimrecreate that also allows updating metadata
#!/usr/bin/env python3
import argparse
import base64
import pathlib
import re
import sys
import traceback
from typing import List, Optional
try:
from libzim.reader import Archive
from libzim.writer import Creator, Item, ContentProvider, Blob
from libzim.writer import Hint
except ImportError as exc:
print("zimrecreate requires python-libzim")
print("Install it with `pip install libzim`")
print(str(exc))
sys.exit(2)
__version__ = "1.0"
debug = False
illus_re = re.compile(r"^Illustration_(?P<size>\d+x\d+)@(?P<scale>\d)+$")
class ProxyContentProvider(ContentProvider):
def __init__(self, source):
super().__init__()
self.source = source
def get_size(self) -> int:
return self.source.size
def gen_blob(self) -> Blob:
yield Blob(bytes(self.source.content))
yield Blob(b"")
class ProxyItem(Item):
def __init__(self, source, is_front: Optional[bool]):
super().__init__()
self.source = source
self.is_front = is_front
def get_path(self) -> str:
return self.source.path
def get_title(self) -> str:
return self.source.title
def get_mimetype(self) -> str:
return self.source.mimetype
def get_contentprovider(self):
return ProxyContentProvider(self.source)
def get_hints(self):
if self.is_front is None:
return {}
return {Hint.FRONT_ARTICLE: self.is_front}
def get_title_listing_v1_index(zim: Archive) -> int:
"""whether ZIM contains FRONT_ARTICLES (has titleListingv1)"""
for index in range(zim.all_entry_count - 1, zim.all_entry_count - 10, -1):
if not index:
break
if zim._get_entry_by_id(index).path == "listing/titleOrdered/v1":
return index
return -1
def get_front_articles(zim: Archive) -> List[int]:
title_listing_id = get_title_listing_v1_index(zim)
if title_listing_id < 0:
return []
article_ids = []
item = zim._get_entry_by_id(title_listing_id).get_item()
for index in range(item.size // 4):
article_ids.append(
int.from_bytes(item.content[index * 4 : index * 4 + 4], byteorder="little")
)
return article_ids
def recreate(
src_path: pathlib.Path,
dst_path: pathlib.Path,
new_meta: List,
nb_workers: int,
debug: bool,
) -> int:
print(f"Starting zimrecreate\n\tfrom: {src_path}\n\tinto: {dst_path}")
src_path = src_path.expanduser().resolve()
dst_path = dst_path.expanduser().resolve()
print(f"Checking provided metadata ({len(new_meta)})")
# check user-specified metadata first
new_metadata = {}
for line in new_meta:
try:
key, data = line.split("=", 1)
except ValueError:
print(f"ERROR: Malformed metadata param: {line}")
return 2
# text metadata
if not data.startswith("data:"):
new_metadata[key] = ("text/plain;charset=UTF-8", data.encode("UTF-8"))
continue
# binary metadata
try:
mimetype, payload = re.match(
r"^data:(?P<mimetype>.+);base64,(?P<payload>.+)$", data
).groups()
except Exception:
print(f"ERROR: Malformed binary metadata param: {data}")
return 2
try:
new_metadata[key] = (mimetype, base64.b64decode(payload))
except Exception:
print(f"ERROR: Invalid base64 payload: {payload}")
return 2
print("Analyzing source ZIM")
try:
src = Archive(src_path)
except Exception as exc:
raise IOError(f"Source ZIM ({src_path}) doesnt exists: {exc}")
if not src.has_new_namespace_scheme:
print("WARNING: Source ZIM had namespaces.")
main_path = (
src.main_entry.get_redirect_entry().path
if src.main_entry.is_redirect
else src.main_entry.path
)
front_articles = get_front_articles(src)
# retrieve source metadata
metadata = {}
for name in src.metadata_keys:
# illustrations handled separately, Counter added by libzim
if name in ("Counter") or illus_re.match(name):
continue
item = src.get_metadata_item(name)
metadata[name] = (item.mimetype, bytes(item.content))
for size in src.get_illustration_sizes():
item = src.get_illustration_item(size)
metadata[f"Illustration_{size}x{size}@1"] = (item.mimetype, bytes(item.content))
# override metadata with user-provided ones
metadata.update(new_metadata)
print("Computed new ZIM metadata to:")
for key, value in metadata.items():
preview = (
(value[1].decode("UTF-8") if isinstance(value[1], bytes) else value[1])
if value[0].startswith("text/plain")
else f"{value[0]} binary ({len(value[0])} bytes)"
)
print(f"\t{key}: {preview}")
missing_mandatory_metadata = [
name
for name in [
"Title",
"Description",
"Creator",
"Publisher",
"Date",
"Name",
"Language",
]
if name not in metadata
]
if missing_mandatory_metadata:
print(
"ERROR: Destination ZIM would lack mandatory metadata: "
f"{', '.join(missing_mandatory_metadata)}."
)
return 2
print("Starting destination ZIM Creator")
dst = Creator(filename=dst_path).config_nbworkers(nb_workers)
if debug:
dst.config_verbose(True)
if "Language" in metadata:
dst.config_indexing(True, metadata["Language"][1].decode("UTF-8"))
dst.set_mainpath(main_path)
dst.__enter__()
# metadata
print("Adding metadata & illustrations")
for key, value in metadata.items():
if debug:
print(f"> {key}")
if illus_re.match(key):
size = int(illus_re.match(key).groupdict()["size"].split("x", 1)[0])
dst.add_illustration(size, value[1])
continue
dst.add_metadata(key, content=value[1], mimetype=value[0])
print("Adding all entries")
is_front = None
for index in range(src.all_entry_count):
entry = src._get_entry_by_id(index)
# hack to get around the fact we don't know the actual namespace
if entry.path in metadata.keys():
continue
if entry.path in ("Counter",) or illus_re.match(entry.path):
continue
if entry.path in (
"title/xapian",
"fulltext/xapian",
"listing/titleOrdered/v0",
"listing/titleOrdered/v1",
):
continue
# would be W/mainPage, handled by libzim
if entry.path == "mainPage" or entry == src.main_entry:
continue
if front_articles:
is_front = entry.path == main_path or index in front_articles
if debug:
print(f"> {is_front} -- {entry.path}")
if entry.is_redirect:
dst.add_redirection(
path=entry.path,
title=entry.title,
targetPath=entry.get_redirect_entry().path,
hints={} if is_front is None else {Hint.FRONT_ARTICLE: is_front},
)
continue
dst.add_item(ProxyItem(entry.get_item(), is_front=is_front))
print("Finishing ZIM…")
dst.__exit__(None, None, None)
return 0
def entrypoint():
epilog = (
"""ZIM Metadata spec: https://wiki.openzim.org/wiki/Metadata
Metadata are not restricted to the ones specified in the spec; """
"""but those are the ones that are used by ZIM readers.
Use proper case (Pascal Case) when specifying standard Metadata.
You can use data: URLs to specify non-plain/text metadata using base64 encoding with:
data:<mimetype>;base64,<data>
Illustrations are set as `Illustration_<size>@1` """
"""with size=48x48 for the default illustration.
Examples:
-m "Title=A new Hope"
-m "Illustration_48x48@1="""
"""CQd1PeAAAAGXRFWHRTb2Z0d2FyZQBBZG9iZSBJbWFnZVJlYWR5ccllPAAAAA9JREFUeNpi+P//"""
"""P0CAAQAF/gL+Lc6J7gAAAABJRU5ErkJggg=="
Limitations:
- Compression Hint is readable so is not set. All entries uses default mode
- ZIM cannot contain entries with following paths:
- mainPage
- title/xapian
- fulltext/xapian
- listing/titleOrdered/v0
- listing/titleOrdered/v1
"""
)
parser = argparse.ArgumentParser(
prog="zimrecreate.py",
description="Recreate a ZIM from another ZIM, possibly changing metadata",
epilog=epilog,
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument("src_path")
parser.add_argument("dst_path")
parser.add_argument(
"-m",
"--meta",
dest="new_meta",
action="append",
default=list(),
help='New metadata to set on ZIM. Use -m "Title=Better ZIM Title" format',
)
parser.add_argument(
"--threads",
help="Nb of threads to use in libzim",
default=4,
type=int,
dest="nb_workers",
)
parser.add_argument(
"--debug", help="Enable verbose output", action="store_true", default=False
)
parser.add_argument(
"-v", "--version", action="version", version=f"%(prog)s {__version__}"
)
args = dict(parser.parse_args()._get_kwargs())
args["src_path"] = pathlib.Path(args["src_path"])
args["dst_path"] = pathlib.Path(args["dst_path"])
try:
sys.exit(recreate(**args))
except Exception as exc:
print(f"ERROR. An {type(exc).__name__} error occurred: {exc}")
if args["debug"]:
print(traceback.format_exception_only(exc, sys.last_value))
raise SystemExit(1)
if __name__ == "__main__":
entrypoint()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment