-
-
Save arkarkark/eef9bb9cfedbc6507a8255e543dd5d1e to your computer and use it in GitHub Desktop.
add imdb ratings to your itunes movie collection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Copyright 2019 Alex K (wtwf.com) | |
# PYTHON_ARGCOMPLETE_OK | |
# This lives at https://gist.github.com/arkarkark/eef9bb9cfedbc6507a8255e543dd5d1e | |
# Update it with | |
# gist -u eef9bb9cfedbc6507a8255e543dd5d1e ~/bin/share/imdb_movie_ratings_adder | |
"""Put IMDB ratings into the bpm field of the selected movies in iTunes. | |
Also updates the MPAA Content rating from the IMDB data too | |
If you want to update all the other metadata then I recommend https://metaz.io | |
""" | |
# pip3 install appscript argcomplete requests | |
# put an API key from http://www.omdbapi.com/apikey.aspx in ~/.omdbapikey | |
# brew install atomicparsley | |
# curl -o - "http://www.omdbapi.com/?apikey=$(cat ~/.omdbapikey)&t=ghostbusters" | json_pp | |
# curl -o - "http://www.omdbapi.com/?apikey=$(cat ~/.omdbapikey)&t=big%20mouth&season=1&episode=2" | |
# other options for data thetvdb.com, themoviedb.org | |
__author__ = "wtwf.com (Alex K)" | |
import argparse | |
import collections | |
import csv | |
import json | |
import logging | |
import math | |
import os | |
import queue | |
import re | |
import readline | |
import subprocess | |
import threading | |
import time | |
import types | |
import urllib.parse | |
import appscript | |
import argcomplete | |
import html2text | |
import requests | |
def movie_from_dict(movie_dict): | |
# I could have used types.SimpleNamespace(**d) too | |
return collections.namedtuple("Movie", sorted(movie_dict))(**movie_dict) | |
def imdb_rating_to_itunes_bpm(imdb_rating): | |
try: | |
rating = math.floor(float(imdb_rating) * 10) | |
return rating | |
except ValueError as err: | |
logging.error(err) | |
def main(): | |
"""Parse args and do the thing.""" | |
logging.basicConfig() | |
readline.add_history("0") | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--app", default="TV", choices=["Music", "TV"], help="App to add to." | |
) | |
parser.add_argument( | |
"-a", | |
"--all", | |
help="just select the first match. (and open imdb in a browser)", | |
action="store_true", | |
) | |
parser.add_argument( | |
"-q", | |
"--quick", | |
help="If there is a title/year match select that, otherwise ask", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--csv", | |
help="produce a csv file of all the selected movies", | |
action="store_true", | |
) | |
parser.add_argument( | |
"-e", "--episodes", help="update episode info", action="store_true" | |
) | |
parser.add_argument( | |
"--everything", help="update all fields not just ratings", action="store_true" | |
) | |
parser.add_argument("--imdb-id", help="use this imdb id") | |
parser.add_argument("--tvmaze-id", help="use this tvmaze id") | |
parser.add_argument( | |
"-n", | |
"--no-write", | |
dest="write", | |
default=True, | |
help="Do not update files or iTunes", | |
action="store_false", | |
) | |
parser.add_argument( | |
"--no-mpaa", | |
dest="mpaa", | |
default=True, | |
help="Do not update mpaa rating", | |
action="store_false", | |
) | |
parser.add_argument( | |
"--no-imdb", | |
dest="imdb", | |
default=True, | |
help="Do not use imdb", | |
action="store_false", | |
) | |
parser.add_argument( | |
"--open", help="open imdb in a browser for each movie", action="store_true" | |
) | |
parser.add_argument( | |
"--whole-show", help="get data for whole show (TV Ratings)", action="store_true" | |
) | |
parser.add_argument("-r", "--rating", help="set this rating", type=int) | |
parser.add_argument("-s", "--season", help="override season info", type=int) | |
parser.add_argument("-t", "--title", help="override show title") | |
parser.add_argument("-v", "--verbose", help="Log verbosely", action="store_true") | |
parser.add_argument("-d", "--debug", help="Log debug messages", action="store_true") | |
argcomplete.autocomplete(parser) | |
args = parser.parse_args() | |
if args.verbose: | |
logging.getLogger().setLevel(logging.INFO) | |
if args.debug: | |
logging.getLogger().setLevel(logging.DEBUG) | |
logging.info("Log") | |
if args.episodes or args.whole_show: | |
add_imdb_info_to_selection(args) | |
else: | |
AddImdbRatings(args) | |
def add_imdb_info_to_selection(args): | |
itunes = appscript.app(args.app) | |
lookup_api = LookupAPI() | |
for track in itunes.selection.get(): | |
print(f"\n\nFILE: {track.name.get()}") | |
episode = track.episode_number.get() | |
season = args.season or track.season_number.get() or 1 | |
imdb_id = args.imdb_id | |
tvmaze_id = args.tvmaze_id | |
show = args.title or track.show.get() | |
print(show, imdb_id, season, episode) | |
if (show or imdb_id) and season and episode: | |
api_funcs = [] | |
if args.imdb: | |
api_funcs.append(lookup_api.lookup_episode) | |
if args.whole_show: | |
api_funcs.append(TvMazeApi.lookup_show) | |
else: | |
api_funcs.append(TvMazeApi.lookup_episode) | |
for func in api_funcs: | |
data = func( | |
season, episode, show=show, imdb_id=imdb_id, tvmaze_id=tvmaze_id | |
) | |
if hasattr(data, "Error"): | |
logging.error("ERROR: %r", data.Error) | |
else: | |
update_track_with_data(track, data, args.write) | |
break | |
def prefill_input(text, prompt=""): | |
def hook(): | |
readline.insert_text(text) | |
readline.redisplay() | |
readline.set_pre_input_hook(hook) | |
result = input(prompt) | |
readline.set_pre_input_hook() | |
return result.strip() | |
class TvMazeApi: | |
@staticmethod | |
def lookup_show(season, episode, show=None, imdb_id=None, tvmaze_id=None): | |
print("lookup_show", show) | |
if not tvmaze_id and imdb_id: | |
tvmaze_id = TvMazeApi.get_show_id_from_imdb_id(imdb_id) | |
if not tvmaze_id: | |
tvmaze_id = TvMazeApi.get_show_id_from_name(show) | |
if not tvmaze_id: | |
return { | |
"Error": f"Could not find show_id for '{show}' or imdb_id:'{imdb_id}'" | |
} | |
url = f"http://api.tvmaze.com/shows/{tvmaze_id}" | |
return TvMazeApi.call(url) | |
@staticmethod | |
def lookup_episode(season, episode, show=None, imdb_id=None, tvmaze_id=None): | |
if not tvmaze_id and imdb_id: | |
tvmaze_id = TvMazeApi.get_show_id_from_imdb_id(imdb_id) | |
if not tvmaze_id: | |
tvmaze_id = TvMazeApi.get_show_id_from_name(show) | |
if not tvmaze_id: | |
return { | |
"Error": f"Could not find show_id for '{show}' or imdb_id:'{imdb_id}'" | |
} | |
params = {"season": season, "number": episode} | |
url = f"http://api.tvmaze.com/shows/{tvmaze_id}/episodebynumber" | |
return TvMazeApi.call(url, params) | |
@staticmethod | |
def call(url, params=None): | |
reply = requests.get(url=url, params=params or {}).json() | |
print(reply) | |
mapping = {"name": "Title", "summary": "Plot"} | |
data = {} | |
for src_key, data_key in mapping.items(): | |
val = reply.get(src_key) | |
if val: | |
data[data_key] = html2text.html2text(val).strip() | |
year = reply.get("airdate") | |
if year: | |
data["Year"] = int(year[0:4]) | |
rating = reply.get("rating") | |
if rating: | |
data["imdbRating"] = rating.get("average") | |
print(data) | |
return types.SimpleNamespace(**data) | |
@staticmethod | |
def get_show_id_from_imdb_id(imdb_id): | |
params = {"imdb": imdb_id} | |
url = "http://api.tvmaze.com/lookup/shows" | |
try: | |
return requests.get(url=url, params=params).json().get("id") | |
except: | |
pass | |
@staticmethod | |
def get_show_id_from_name(show): | |
params = {"q": show} | |
url = "http://api.tvmaze.com/singlesearch/shows" | |
try: | |
return requests.get(url=url, params=params).json().get("id") | |
except: | |
pass | |
def update_track_with_data(track, data, write): | |
imdb_to_itunes = { | |
"Title": "name", | |
"Plot": "description", | |
"Year": "year", | |
"Genre": "genre", | |
"imdbID": "comment", | |
"imdbRating": "grouping", | |
# "Actors": "artist", | |
} | |
for imdb_key, itunes_key in imdb_to_itunes.items(): | |
value = getattr(data, imdb_key, None) | |
if value and value != "N/A" and value != "Not Found": | |
if itunes_key == "genre": | |
value = value.split(",")[0].strip() | |
elif itunes_key == "comment": | |
value = f"https://www.imdb.com/title/{value}" | |
elif itunes_key in ["grouping", "bpm"]: | |
value = imdb_rating_to_itunes_bpm(value) | |
if value: | |
print(f"{itunes_key}: {value}") | |
if write: | |
getattr(track, itunes_key).set(value) | |
class AddImdbRatings: | |
def __init__(self, args): | |
self.args = args | |
self.itunes = appscript.app(args.app) | |
self.lookup_api = LookupAPI() | |
self.add_imdb_ratings_to_selection() | |
def movie_choice( | |
self, movie_name, movie_year=None, choose_first=False, quick=False | |
): | |
print(f" Retrieving data from IMDb for {movie_name} ({movie_year})") | |
match = re.search(r" *\((sml|4k)\) *$", movie_name, re.IGNORECASE) | |
if match: | |
movie_name = movie_name[0 : -1 * len(match.group(0))] | |
movies = self.lookup_api.search_by_name_and_year(movie_name, movie_year) | |
print( | |
f" https://www.imdb.com/find?s=all&q={urllib.parse.quote_plus(movie_name)}" | |
) | |
if not movies: | |
print(f""" No matches found for "{movie_name}" made in {movie_year}""") | |
if choose_first: | |
return None | |
print("> Type a new search term, hit enter to skip,") | |
else: | |
if choose_first: | |
return movies[0] | |
lines = [] | |
for movie in movies: | |
if ( | |
quick | |
and movie_year | |
and movie_name.lower() == movie.Title.lower() | |
and str(movie_year) == str(movie.Year) | |
): | |
return movie | |
movie_type = f" {movie.Type}" if movie.Type != "movie" else "" | |
movie_extra = f"({movie.Year}{movie_type})" | |
movie_link = f"""https://imdb.com/title/{movie.imdbID}""" | |
lines.append(f"{movie_link} {movie.Title} {movie_extra}") | |
print(" Potential Title Matches") | |
print("\n".join([f" {i}. {j}" for i, j in enumerate(lines)])) | |
# ask user what movie to use | |
print( | |
"> Type number of correct title (enter for first), a new search, 'no' to skip," | |
) | |
print("> paste in an imdb url, or enter a score between 10 and 100") | |
reply = prefill_input("" if movies else movie_name) | |
if reply == movie_name and not movie_year: | |
reply = "no" | |
if reply == "": | |
if movies: | |
reply = "0" | |
else: | |
reply = "no" | |
if reply == "no": | |
return None | |
if reply == "quit": | |
return False | |
try: | |
reply_int = int(reply) | |
if 10 < reply_int <= 100: | |
return movie_from_dict({"imdbRating": reply_int * 0.1}) | |
return movies[reply_int] | |
except ValueError: | |
# It's not a number so it must be a new search term. | |
match = re.search(r"(tt[0-9]+)", reply) | |
if match: | |
return movie_from_dict({"imdbID": match.group(1)}) | |
# start a new search with whatever was entered | |
return self.movie_choice(reply) | |
def add_imdb_ratings_to_selection(self): | |
csv_file = None | |
csv_writer = None | |
if self.args.csv: | |
csv_filename = self.itunes.browser_windows()[0].view.name() + ".csv" | |
logging.info("Writing CSV: %s", csv_filename) | |
csv_file = open(csv_filename, "w", newline="") | |
csv_writer = csv.writer(csv_file) | |
csv_writer.writerow( | |
["field1", "field2", "const", "field4", "description", "field6"] | |
) | |
for track in self.itunes.selection.get(): | |
if self.args.write or not self.args.all: | |
print("\n\n\n\n") | |
movie_name = track.show.get() or track.name.get() | |
movie_year = track.year.get() | |
movie_imdb_id = None | |
match = re.search(r"(tt[0-9]+)", track.comment.get()) | |
if match: | |
movie_imdb_id = match.group(1) | |
print(movie_name) | |
if self.args.rating: | |
movie_imdb_rating = str(float(self.args.rating) / 10.0) | |
movie = types.SimpleNamespace(imdbRating=movie_imdb_rating) | |
elif csv_writer and movie_imdb_id: | |
movie = types.SimpleNamespace(imdbID=movie_imdb_id) | |
else: | |
movie = self.movie_choice( | |
movie_name, movie_year, self.args.all, self.args.quick | |
) | |
if movie and movie.imdbID and movie.imdbID != movie_imdb_id: | |
track.comment.set(f"{movie.imdbID} \n{track.comment.get()}") | |
if movie is False: | |
break | |
if not movie: | |
continue | |
if self.args.open and hasattr(movie, "imdbID"): | |
subprocess.call(["open", f"https://www.imdb.com/title/{movie.imdbID}"]) | |
if self.args.all: | |
time.sleep(1) | |
if csv_writer: | |
csv_writer.writerow( | |
["", "", movie.imdbID, movie_name, movie_year or "", ""] | |
) | |
continue | |
if not self.args.write: | |
continue | |
full_movie = movie | |
if not hasattr(full_movie, "imdbRating"): | |
full_movie = self.lookup_api.full_movie_info(movie.imdbID) | |
# IMDB rating | |
if self.args.everything: | |
update_track_with_data(track, full_movie, self.args.write) | |
else: | |
rating = imdb_rating_to_itunes_bpm(full_movie.imdbRating) | |
if rating: | |
print(f"Setting imdb rating (grouping) to {rating}") | |
track.grouping.set(rating) | |
# MPAA rating | |
if ( | |
not self.args.mpaa | |
or not hasattr(full_movie, "Rated") | |
or not full_movie.Rated | |
or full_movie.Rated in "N/A,Not Rated,Unrated".split(",") | |
): | |
continue | |
location = track.location.get().path | |
mp4info = Mp4Info(location) | |
content_rating = mp4info.content_rating | |
if not content_rating or full_movie.Rated != content_rating.content_rating: | |
mp4info.content_rating = full_movie.Rated | |
# # @itunes.refresh(track), @itunes.selection.refresh, @itunes.add(location) NONE WORK! | |
if csv_file: | |
print("All Done Closing csv file") | |
csv_file.close() | |
return | |
Mp4Info.finalize() | |
# now refresh the selections | |
subprocess.check_output( | |
[ | |
"/usr/bin/osascript", | |
"-e", | |
"""tell application "%s" to refresh selection""" % self.args.app, | |
] | |
) | |
class LookupAPI: | |
def __init__(self): | |
with open(os.path.expanduser("~/.omdbapikey")) as key_file: | |
self.omdbapikey = key_file.read().strip() | |
def data(self, query): | |
query["apikey"] = self.omdbapikey | |
return json.loads( | |
requests.get(url="http://www.omdbapi.com/", params=query).text, | |
object_hook=lambda d: types.SimpleNamespace(**d), | |
) | |
def lookup_episode(self, season, episode, show=None, imdb_id=None, tvmaze_id=None): | |
params = {"episode": episode, "season": season} | |
if imdb_id: | |
params["i"] = imdb_id | |
else: | |
params["t"] = show | |
return self.data(params) | |
def search(self, query, year=0): | |
params = {"s": query} | |
if year: | |
params["y"] = str(year) | |
objs = self.data(params) | |
if hasattr(objs, "Error"): | |
logging.error("ERROR: %r", objs.Error) | |
if hasattr(objs, "Search"): | |
return objs.Search | |
if year: | |
# no results found, try without a year | |
return self.search(query) | |
return [] | |
def full_movie_info(self, imdb_id): | |
return self.data({"i": imdb_id}) | |
def search_by_name_and_year(self, movie_name, movie_year=0): | |
movies = [] | |
movie_results = self.search(movie_name, movie_year) | |
if not movie_results: | |
movie_results = [] | |
for movie_result in movie_results: | |
# check that the year tag in the file name matches with the release date, | |
# otherwise not the movie we are looking for | |
if movie_year and movie_result.Year: | |
try: | |
if abs(int(movie_result.Year) - int(movie_year)) < 2: | |
movies.append(movie_result) | |
except ValueError: | |
pass | |
else: | |
movies.append(movie_result) | |
return movies | |
class Mp4Info: | |
things_to_wait_for = queue.Queue() | |
worker = None | |
def __init__(self, filename): | |
self.filename = filename | |
@property | |
def content_rating(self): | |
output = subprocess.check_output(["AtomicParsley", self.filename, "-t"]) | |
match = re.search(r"com.apple.iTunes;iTunEXTC. contains: (.*)", output.decode()) | |
if match: | |
logging.debug("match.group(1): %r", match.group(1)) | |
arr = match.group(1).split("|") | |
logging.debug("Mp4Info.content_rating: %r", arr) | |
return types.SimpleNamespace( | |
**{"org": arr[0], "content_rating": arr[1], "code": arr[2]} | |
) | |
return None | |
@content_rating.setter | |
def content_rating(self, rating): | |
print(f"Setting content rating to {rating}") | |
Mp4Info.things_to_wait_for.put( | |
types.SimpleNamespace(**{"filename": self.filename, "rating": rating}) | |
) | |
if not Mp4Info.worker: | |
logging.debug("making a worker thread") | |
Mp4Info.worker = threading.Thread(target=Mp4Info.work) | |
Mp4Info.worker.start() | |
@staticmethod | |
def finalize(): | |
if not Mp4Info.things_to_wait_for.empty(): | |
print("Waiting for AtomicParsley to finish writing content ratings...") | |
logging.debug("waiting for queue to be empty") | |
if Mp4Info.worker: | |
Mp4Info.things_to_wait_for.put(None) | |
logging.debug("waiting for worker thread to be done") | |
Mp4Info.worker.join() | |
Mp4Info.worker = None | |
@staticmethod | |
def work(): | |
while True: | |
item = Mp4Info.things_to_wait_for.get() | |
Mp4Info.things_to_wait_for.task_done() | |
logging.debug("Mp4Info.work: %r", item) | |
if item: | |
subprocess.check_output( | |
[ | |
"AtomicParsley", | |
item.filename, | |
"--overWrite", | |
"--contentRating", | |
item.rating, | |
] | |
) | |
logging.debug("done with AtomicParsley") | |
else: | |
break | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
tell application "Terminal" | |
activate | |
set currentTab to do script "/Users/ark/bin/share/imdb_movie_ratings_adder; exit 0" | |
end tell |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment