Skip to content

Instantly share code, notes, and snippets.

@daedric
Created May 7, 2020 11:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daedric/6c3bfadceef09392e8be4d094a57c992 to your computer and use it in GitHub Desktop.
Save daedric/6c3bfadceef09392e8be4d094a57c992 to your computer and use it in GitHub Desktop.
from pathlib import Path
import re
import os
import string
import xml.etree.ElementTree as ET
import hashlib
import argparse
parser = argparse.ArgumentParser(description='Monkeypatch video files with manually downloaded one')
parser.add_argument('--source',
help='Directory containing the original videos')
parser.add_argument('--patched',
help='Directory containing the patched videos')
parser.add_argument('--replace', action='store_true',
help='Actually replace the video found')
args = parser.parse_args()
def sha256sum(filename):
h = hashlib.sha256()
b = bytearray(128 * 1024)
mv = memoryview(b)
with open(filename, "rb", buffering=0) as f:
for n in iter(lambda: f.readinto(mv), 0):
h.update(mv[:n])
return h.hexdigest()
def find_videos(d):
if d.is_dir():
for f in d.iterdir():
if f.suffix in {
".mkv",
".avi",
".iso",
}:
yield f
if d.suffix in {
".mkv",
".avi",
".iso",
}:
yield d
def remove_hash(f):
fhash = f.with_suffix(".hash")
try:
fhash.unlink(missin_ok=True)
except:
pass
def get_hash(f):
fhash = f.with_suffix(".hash")
if fhash.exists():
with open(fhash, "r") as r:
return r.read().strip()
h = sha256sum(f)
with open(fhash, "w") as w:
w.write(h)
return h
source = Path(args.source)
patched = Path(args.patched)
videos = {f: lambda: get_hash(f) for f in find_videos(source)}
def look_for_tvinfo(source):
for f in source.iterdir():
if f.name == "tvshow.nfo":
return f
if source.name == "/":
raise Exception("could not find the tvshow.nfo")
return look_for_tvinfo(source.parent)
def data(root, node_path):
children = root.findall(node_path)
if not children or len(children) > 1:
raise Exception(
"children count for {}: 1 != {}".format(node_path, len(children))
)
return children[0].text
def remove_punct(s):
exclude = set(string.punctuation)
ss = ""
for ch in s:
ss += ch not in exclude and ch or " "
return ss
# return s.translate(str.maketrans('', '', string.punctuation))
tvinfo = look_for_tvinfo(source)
tree = ET.parse(tvinfo)
root = tree.getroot()
title = data(root, "./title")
print("Monkey patch serie: {}".format(title))
title = remove_punct(title)
title_words = frozenset(title.split())
season = re.compile(r"S(\d+)E(\d+)")
episodes = []
for f in videos:
m = season.search(f.name)
if not m:
print("Ignore file: {}".format(f.name))
episodes.append(m.group(0))
episodes = frozenset(episodes)
for f in patched.iterdir():
name = f.name
name = remove_punct(name)
name = frozenset(name.split())
if not (title_words <= name):
continue
episode = episodes & name
if not episode:
# print("Not interested in : {}".format(f.name))
continue
episode = list(episode)[0]
maybe_replacement = list(find_videos(f))
if not maybe_replacement:
print("Did not find a video in {}".format(f.name))
continue
if len(maybe_replacement) > 1:
print("Find too many videos in {}: {}".format(f.name, maybe_replacement))
continue
maybe_replacement = maybe_replacement[0]
maybe_replacement_stat = maybe_replacement.stat()
name = ''
if maybe_replacement.samefile(f):
name = maybe_replacement.name
else:
name = f.name
for source, hash_ in videos.items():
if episode not in source.name:
continue
source_stat = source.stat()
if (
os.path.samestat(maybe_replacement_stat, source_stat)
):
# print(
# "{} is already hard link on {}".format(source.name, maybe_replacement)
# )
continue
if hash_() == get_hash(maybe_replacement):
# print(
# "{} is already the same file as {} (same hash)".format(source.name, maybe_replacement)
# )
continue
remplacement_name = source.with_name(name)
remplacement_name = remplacement_name.with_suffix(remplacement_name.suffix + maybe_replacement.suffix)
print("Replace file: {} with {} using name: {}".format(source.name, maybe_replacement, remplacement_name))
if args.replace:
os.link(maybe_replacement, remplacement_name)
source.unlink()
remove_hash(source)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment