Skip to content

Instantly share code, notes, and snippets.

@alashow
Created May 7, 2022 04:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alashow/8b6536a98819d5e2c1b4eda5c643ca0a to your computer and use it in GitHub Desktop.
Save alashow/8b6536a98819d5e2c1b4eda5c643ca0a to your computer and use it in GitHub Desktop.
devil is in the details
import argparse
import json
from guessit import guessit
from guessit.jsonutils import GuessitEncoder
from pprint import pp, pprint
from tqdm import tqdm
from multiprocessing import cpu_count, Pool
import random
import logging
import sys
import re
import os
logging.basicConfig(level=logging.ERROR)
EXTENSIONS_TO_IGNORE = ['nfo-orig', 'srt', 'sub', 'ass', 'jpg']
QUALITY_TYPES = ['2160p', '1080p', '720p', '480p', '360p', '240p']
FILENAME_REGEX=r'(.*) \([0-9]{4}\) \['
FILENAME_TV_REGEX_1=r'(.*)( \([0-9]{4}\))?\/([sS]eason )?([0-9]{1,4})|Specials\/'
FILENAME_TV_REGEX_2=r'(.*)\/([sS]eason )?([0-9]{1,4})\/'
# BASE_PATH='/home/alashov/uploader/guess-to-ignore/'
BASE_PATH='/Users/alashov/Dropbox/docs/other/scripts/media/san-ignores/'
GUESSES_PATH=BASE_PATH + 'guesses/'
INPUT_LISTS_PATH=BASE_PATH + 'input-lists/'
OUTPUT_LISTS_PATH=BASE_PATH + 'output-lists/'
def get_quality_type(guess):
for key, value in guess.items():
if value in QUALITY_TYPES:
return value
def chunks(l, n):
n = max(1, n)
return (l[i:i+n] for i in range(0, len(l), n))
def read_file_to_list(file_name, ignore_extensions=EXTENSIONS_TO_IGNORE):
if not os.path.isfile(file_name):
raise Exception(f'File {file_name} does not exist')
files = []
ignored_count = 0
with open(file_name, 'r') as f:
for line in f:
file = line.strip()
file_extension = file.split('.')[-1]
if file_extension not in ignore_extensions:
files.append(file)
else:
ignored_count += 1
print(f'{len(files)} files read from {file_name}, ignored {ignored_count} files')
return files
def write_guesses_to_file_as_json(guesses: dict, file_name):
with open(f'{GUESSES_PATH}{file_name}.json', 'w') as f:
json.dump(guesses, f, cls=GuessitEncoder, ensure_ascii=False, indent=2)
def read_guesses_to_file_as_json(file_name):
if not os.path.isfile(f'{GUESSES_PATH}{file_name}.json'):
return {}
f = open(f'{GUESSES_PATH}{file_name}.json')
data = json.load(f)
return data
def analyze_tv_file_name(file):
logging.info(f'Analyzing tv file name: {file}')
try:
guess = guessit(file)
except Exception as e:
logging.error(f'Error processing file: {file}')
logging.error(e)
return file, None
if guess['type'] == 'movie':
return file, None
try:
episode_title = guess['episode_title'] if 'episode_title' in guess else None
if 'season' not in guess and episode_title:
guess['season'] = episode_title
if 'season' in guess and 'episode' in guess:
episode = f"s{guess['season']}e{guess['episode']}"
elif 'date' in guess:
episode = guess['date']
elif 'episode_title' in guess:
episode = guess['episode_title']
print('Overrode episode with episode title', episode)
elif 'alternative_title' in guess:
episode = guess['episode_title']
print('Overrode episode with alternative title', episode)
else:
print('No episode found: ', file, guess)
quality = get_quality_type(guess)
filename_regex_matches = re.match(FILENAME_REGEX, file)
if filename_regex_matches:
title = filename_regex_matches.group(1)
if not filename_regex_matches:
filename_tv_regex_matches = re.match(FILENAME_TV_REGEX_1, file)
if filename_tv_regex_matches:
title = filename_tv_regex_matches.group(1)
else:
filename_tv_regex_matches = re.match(FILENAME_TV_REGEX_2, file)
if filename_tv_regex_matches:
title = filename_tv_regex_matches.group(1)
else:
title = guess['title']
if not title:
print('No regex match or guess!: ', file)
if episode_title and title.isnumeric():
title = episode_title
if ', The' in title:
title = 'The ' + title.replace(', The', '')
if "'" in title:
title = title.replace("'", "")
key = f"{title}-{episode}-{quality}"
except Exception as e:
key = file
data = {'path': file, 'guess': guess}
serialized = json.dumps(data, cls=GuessitEncoder, ensure_ascii=False)
return key, serialized
def analyze_movie_file_name(file):
logging.info(f'Analyzing movie file name: {file}')
try:
guess = guessit(file)
except Exception as e:
logging.error(f'Error processing file: {file}')
logging.error(e)
return file, None
try:
quality = get_quality_type(guess) or "unknown"
year = guess['year']
title = guess['title']
title = title.replace("'", "")
key = f"{title}-{year}-{quality}"
except:
key = file
data = {'path': file, 'guess': guess}
serialized = json.dumps(data, cls=GuessitEncoder, ensure_ascii=False)
return key, serialized
def build_guesses_for_file_names(files, filename_analyzer, parallel=True):
guesses = {}
pool = Pool(cpu_count())
results = []
if parallel:
for result in tqdm(pool.imap_unordered(filename_analyzer, files), total=len(files)):
results.append(result)
else:
for file in tqdm(files):
results.append(filename_analyzer(file))
for key, data in results:
if data:
data = json.loads(data)
if key in guesses:
guesses[key] = guesses[key] + [data]
else:
guesses[key] = [data]
return guesses
def analyze_tv_files(analyze_missing=None, analyze_existing=None):
# ask before analyzing
if analyze_missing == None:
analyze_missing = input('Analyze missing tv files? (y/n) ')
analyze_missing = analyze_missing.lower() == 'y'
if analyze_existing == None:
analyze_existing = input('Analyze existing tv files? (y/n) ')
analyze_existing = analyze_existing.lower() == 'y'
if analyze_missing:
tv_missing_files = read_file_to_list(f'{INPUT_LISTS_PATH}tv-missing.txt')
print(f'{len(tv_missing_files)} missing tv files, analyzing...')
tv_missing_guesses = build_guesses_for_file_names(tv_missing_files, analyze_tv_file_name)
write_guesses_to_file_as_json(tv_missing_guesses, 'tv-missing')
if analyze_existing:
tv_existing_files = read_file_to_list(f'{INPUT_LISTS_PATH}tv-existing.txt')
print(f'{len(tv_existing_files)} existing tv files, analyzing...')
tv_existing_guesses = build_guesses_for_file_names(tv_existing_files, analyze_tv_file_name)
write_guesses_to_file_as_json(tv_existing_guesses, 'tv-existing')
def analyze_movie_files(analyze_missing=True, analyze_existing=True):
if analyze_missing == None:
analyze_missing = input('Analyze missing movie files? (y/n) ')
analyze_missing = analyze_missing.lower() == 'y'
if analyze_existing == None:
analyze_existing = input('Analyze existing movie files? (y/n) ')
analyze_existing = analyze_existing.lower() == 'y'
if analyze_missing:
missing_files = read_file_to_list(f'{INPUT_LISTS_PATH}movies-missing.txt')
print(f'{len(missing_files)} missing movie files, analyzing...')
missing_guesses = build_guesses_for_file_names(missing_files, analyze_movie_file_name)
write_guesses_to_file_as_json(missing_guesses, 'movies-missing')
if analyze_existing:
existing_files = read_file_to_list(f'{INPUT_LISTS_PATH}movies-existing.txt')
print(f'{len(existing_files)} existing movie files, analyzing...')
existing_guesses = build_guesses_for_file_names(existing_files, analyze_movie_file_name)
write_guesses_to_file_as_json(existing_guesses, 'movies-existing')
def find_existing_tv_missing_files():
tv_missing_guesses = read_guesses_to_file_as_json('tv-missing')
tv_existing_guesses = read_guesses_to_file_as_json('tv-existing')
# print(f'{len(tv_missing_guesses)} missing tv files')
# print(f'{len(tv_existing_guesses)} existing tv files')
existing_files = []
for key, data in tv_missing_guesses.items():
if key in tv_existing_guesses:
if isinstance(data, list):
# print(f'{key} is in existing {tv_existing_guesses[key][0]["path"]}')
for item in data:
existing_files.append(item['path'])
else:
# print(f'{data["path"]} is in existing {tv_existing_guesses[key][0]["path"]}')
existing_files.append(data['path'])
# print("------")
print(f'Found {len(existing_files)} existing tv files')
with open(f'{OUTPUT_LISTS_PATH}tv-auto-ignores.txt', 'w') as ignore_file:
with open(f'{OUTPUT_LISTS_PATH}tv-auto-ignores-latest.txt', 'a') as latest_ignore_file:
for x in existing_files:
ignore_file.write(x + '\n')
latest_ignore_file.write(x + '\n')
return existing_files
def find_existing_movie_missing_files():
missing_guesses = read_guesses_to_file_as_json('movies-missing')
existing_guesses = read_guesses_to_file_as_json('movies-existing')
# print(f'{len(tv_missing_guesses)} missing movie files')
# print(f'{len(tv_existing_guesses)} existing movie files')
existing_files = []
for key, data in missing_guesses.items():
if key in existing_guesses:
for item in data:
existing_files.append(item['path'])
print(f'Found {len(existing_files)} existing movie files to ignore, writing to file...')
with open(f'{OUTPUT_LISTS_PATH}movies-auto-ignores.txt', 'w') as ignore_file:
with open(f'{OUTPUT_LISTS_PATH}movies-auto-ignores-latest.txt', 'a') as latest_ignore_file:
for x in existing_files:
ignore_file.write(x + '\n')
latest_ignore_file.write(x + '\n')
return existing_files
def find_new_shows_to_ignore(file_name, min_episode_count=8):
files = read_file_to_list(file_name, ignore_extensions=[])
show_files = {}
show_file_counts = {}
for file in files:
show_name = file.split('/')[0]
if show_name in show_files:
show_files[show_name] = show_files[show_name] + [file]
show_file_counts[show_name] = show_file_counts[show_name] + 1
else:
show_files[show_name] = [file]
show_file_counts[show_name] = 1
for show_name, files_count in sorted(show_file_counts.items(), key=lambda item: item[1]):
if files_count >= min_episode_count:
for file in show_files[show_name]:
print(file)
# print(f'{show_name} has {files_count} episodes, ignoring...')
if __name__ == '__main__':
arguments = sys.argv[2:]
command = sys.argv[1] if (len(sys.argv) > 1) else None
if command == 'interactive':
analyze_tv = input('Analyze tv files? (y/n) ')
analyze_tv = analyze_tv.lower() == 'y'
if analyze_tv:
analyze_tv_files()
analyze_movies = input('Analyze movie files? (y/n) ')
analyze_movies = analyze_movies.lower() == 'y'
if analyze_movies:
analyze_movie_files()
elif command == 'analyze-missing-tv':
analyze_tv_files(analyze_missing=True, analyze_existing=False)
elif command == 'analyze-existing-tv':
analyze_tv_files(analyze_missing=False, analyze_existing=True)
elif command == 'analyze-existing-movies':
analyze_movie_files(analyze_missing=False, analyze_existing=True)
elif command == 'analyze-find-missing-tv':
analyze_tv_files(analyze_missing=True, analyze_existing=False)
find_existing_tv_missing_files()
elif command == 'analyze-find-missing-movies':
analyze_movie_files(analyze_missing=True, analyze_existing=False)
find_existing_movie_missing_files()
elif command == 'analyze-find-missing-existing-movies':
analyze_movie_files(analyze_missing=True, analyze_existing=True)
find_existing_movie_missing_files()
elif command == 'find-missing-movies':
find_existing_movie_missing_files()
elif command == 'find-missing-tv':
find_existing_tv_missing_files()
elif command == 'find_new_shows_to_ignore':
find_new_shows_to_ignore(arguments[0])
else:
print("Nothing to do")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment