Created
May 7, 2020 11:57
-
-
Save daedric/6c3bfadceef09392e8be4d094a57c992 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
import re | |
import os | |
import string | |
import xml.etree.ElementTree as ET | |
import hashlib | |
import argparse | |
parser = argparse.ArgumentParser(description='Monkeypatch video files with manually downloaded one') | |
parser.add_argument('--source', | |
help='Directory containing the original videos') | |
parser.add_argument('--patched', | |
help='Directory containing the patched videos') | |
parser.add_argument('--replace', action='store_true', | |
help='Actually replace the video found') | |
args = parser.parse_args() | |
def sha256sum(filename): | |
h = hashlib.sha256() | |
b = bytearray(128 * 1024) | |
mv = memoryview(b) | |
with open(filename, "rb", buffering=0) as f: | |
for n in iter(lambda: f.readinto(mv), 0): | |
h.update(mv[:n]) | |
return h.hexdigest() | |
def find_videos(d): | |
if d.is_dir(): | |
for f in d.iterdir(): | |
if f.suffix in { | |
".mkv", | |
".avi", | |
".iso", | |
}: | |
yield f | |
if d.suffix in { | |
".mkv", | |
".avi", | |
".iso", | |
}: | |
yield d | |
def remove_hash(f): | |
fhash = f.with_suffix(".hash") | |
try: | |
fhash.unlink(missin_ok=True) | |
except: | |
pass | |
def get_hash(f): | |
fhash = f.with_suffix(".hash") | |
if fhash.exists(): | |
with open(fhash, "r") as r: | |
return r.read().strip() | |
h = sha256sum(f) | |
with open(fhash, "w") as w: | |
w.write(h) | |
return h | |
source = Path(args.source) | |
patched = Path(args.patched) | |
videos = {f: lambda: get_hash(f) for f in find_videos(source)} | |
def look_for_tvinfo(source): | |
for f in source.iterdir(): | |
if f.name == "tvshow.nfo": | |
return f | |
if source.name == "/": | |
raise Exception("could not find the tvshow.nfo") | |
return look_for_tvinfo(source.parent) | |
def data(root, node_path): | |
children = root.findall(node_path) | |
if not children or len(children) > 1: | |
raise Exception( | |
"children count for {}: 1 != {}".format(node_path, len(children)) | |
) | |
return children[0].text | |
def remove_punct(s): | |
exclude = set(string.punctuation) | |
ss = "" | |
for ch in s: | |
ss += ch not in exclude and ch or " " | |
return ss | |
# return s.translate(str.maketrans('', '', string.punctuation)) | |
tvinfo = look_for_tvinfo(source) | |
tree = ET.parse(tvinfo) | |
root = tree.getroot() | |
title = data(root, "./title") | |
print("Monkey patch serie: {}".format(title)) | |
title = remove_punct(title) | |
title_words = frozenset(title.split()) | |
season = re.compile(r"S(\d+)E(\d+)") | |
episodes = [] | |
for f in videos: | |
m = season.search(f.name) | |
if not m: | |
print("Ignore file: {}".format(f.name)) | |
episodes.append(m.group(0)) | |
episodes = frozenset(episodes) | |
for f in patched.iterdir(): | |
name = f.name | |
name = remove_punct(name) | |
name = frozenset(name.split()) | |
if not (title_words <= name): | |
continue | |
episode = episodes & name | |
if not episode: | |
# print("Not interested in : {}".format(f.name)) | |
continue | |
episode = list(episode)[0] | |
maybe_replacement = list(find_videos(f)) | |
if not maybe_replacement: | |
print("Did not find a video in {}".format(f.name)) | |
continue | |
if len(maybe_replacement) > 1: | |
print("Find too many videos in {}: {}".format(f.name, maybe_replacement)) | |
continue | |
maybe_replacement = maybe_replacement[0] | |
maybe_replacement_stat = maybe_replacement.stat() | |
name = '' | |
if maybe_replacement.samefile(f): | |
name = maybe_replacement.name | |
else: | |
name = f.name | |
for source, hash_ in videos.items(): | |
if episode not in source.name: | |
continue | |
source_stat = source.stat() | |
if ( | |
os.path.samestat(maybe_replacement_stat, source_stat) | |
): | |
# print( | |
# "{} is already hard link on {}".format(source.name, maybe_replacement) | |
# ) | |
continue | |
if hash_() == get_hash(maybe_replacement): | |
# print( | |
# "{} is already the same file as {} (same hash)".format(source.name, maybe_replacement) | |
# ) | |
continue | |
remplacement_name = source.with_name(name) | |
remplacement_name = remplacement_name.with_suffix(remplacement_name.suffix + maybe_replacement.suffix) | |
print("Replace file: {} with {} using name: {}".format(source.name, maybe_replacement, remplacement_name)) | |
if args.replace: | |
os.link(maybe_replacement, remplacement_name) | |
source.unlink() | |
remove_hash(source) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment