Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
add imdb ratings to your itunes movie collection
#!/usr/bin/env python3
# Copyright 2019 Alex K (wtwf.com)
# PYTHON_ARGCOMPLETE_OK
# This lives at https://gist.github.com/arkarkark/eef9bb9cfedbc6507a8255e543dd5d1e
# Update it with
# gist -u eef9bb9cfedbc6507a8255e543dd5d1e ~/bin/share/imdb_movie_ratings_adder
"""Put IMDB ratings into the bpm field of the selected movies in iTunes.
Also updates the MPAA Content rating from the IMDB data too
If you want to update all the other metadata then I recommend https://metaz.io
"""
# pip3 install appscript argcomplete requests
# put an API key from http://www.omdbapi.com/apikey.aspx in ~/.omdbapikey
# brew install atomicparsley
# curl -o - "http://www.omdbapi.com/?apikey=$(cat ~/.omdbapikey)&t=ghostbusters" | json_pp
# curl -o - "http://www.omdbapi.com/?apikey=$(cat ~/.omdbapikey)&t=big%20mouth&season=1&episode=2"
# other options for data thetvdb.com, themoviedb.org
__author__ = "wtwf.com (Alex K)"
import argparse
import collections
import csv
import json
import logging
import math
import os
import queue
import re
import readline
import subprocess
import threading
import time
import types
import urllib.parse
import appscript
import argcomplete
import html2text
import requests
def movie_from_dict(movie_dict):
# I could have used types.SimpleNamespace(**d) too
return collections.namedtuple("Movie", sorted(movie_dict))(**movie_dict)
def imdb_rating_to_itunes_bpm(imdb_rating):
try:
rating = math.floor(float(imdb_rating) * 10)
return rating
except ValueError as err:
logging.error(err)
def main():
"""Parse args and do the thing."""
logging.basicConfig()
readline.add_history("0")
parser = argparse.ArgumentParser()
parser.add_argument(
"--app", default="TV", choices=["Music", "TV"], help="App to add to."
)
parser.add_argument(
"-a",
"--all",
help="just select the first match. (and open imdb in a browser)",
action="store_true",
)
parser.add_argument(
"-q",
"--quick",
help="If there is a title/year match select that, otherwise ask",
action="store_true",
)
parser.add_argument(
"--csv",
help="produce a csv file of all the selected movies",
action="store_true",
)
parser.add_argument(
"-e", "--episodes", help="update episode info", action="store_true"
)
parser.add_argument(
"--everything", help="update all fields not just ratings", action="store_true"
)
parser.add_argument("--imdb-id", help="use this imdb id")
parser.add_argument("--tvmaze-id", help="use this tvmaze id")
parser.add_argument(
"-n",
"--no-write",
dest="write",
default=True,
help="Do not update files or iTunes",
action="store_false",
)
parser.add_argument(
"--no-mpaa",
dest="mpaa",
default=True,
help="Do not update mpaa rating",
action="store_false",
)
parser.add_argument(
"--no-imdb",
dest="imdb",
default=True,
help="Do not use imdb",
action="store_false",
)
parser.add_argument(
"--open", help="open imdb in a browser for each movie", action="store_true"
)
parser.add_argument(
"--whole-show", help="get data for whole show (TV Ratings)", action="store_true"
)
parser.add_argument("-r", "--rating", help="set this rating", type=int)
parser.add_argument("-s", "--season", help="override season info", type=int)
parser.add_argument("-t", "--title", help="override show title")
parser.add_argument("-v", "--verbose", help="Log verbosely", action="store_true")
parser.add_argument("-d", "--debug", help="Log debug messages", action="store_true")
argcomplete.autocomplete(parser)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.INFO)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logging.info("Log")
if args.episodes or args.whole_show:
add_imdb_info_to_selection(args)
else:
AddImdbRatings(args)
def add_imdb_info_to_selection(args):
itunes = appscript.app(args.app)
lookup_api = LookupAPI()
for track in itunes.selection.get():
print(f"\n\nFILE: {track.name.get()}")
episode = track.episode_number.get()
season = args.season or track.season_number.get() or 1
imdb_id = args.imdb_id
tvmaze_id = args.tvmaze_id
show = args.title or track.show.get()
print(show, imdb_id, season, episode)
if (show or imdb_id) and season and episode:
api_funcs = []
if args.imdb:
api_funcs.append(lookup_api.lookup_episode)
if args.whole_show:
api_funcs.append(TvMazeApi.lookup_show)
else:
api_funcs.append(TvMazeApi.lookup_episode)
for func in api_funcs:
data = func(
season, episode, show=show, imdb_id=imdb_id, tvmaze_id=tvmaze_id
)
if hasattr(data, "Error"):
logging.error("ERROR: %r", data.Error)
else:
update_track_with_data(track, data, args.write)
break
def prefill_input(text, prompt=""):
def hook():
readline.insert_text(text)
readline.redisplay()
readline.set_pre_input_hook(hook)
result = input(prompt)
readline.set_pre_input_hook()
return result.strip()
class TvMazeApi:
@staticmethod
def lookup_show(season, episode, show=None, imdb_id=None, tvmaze_id=None):
print("lookup_show", show)
if not tvmaze_id and imdb_id:
tvmaze_id = TvMazeApi.get_show_id_from_imdb_id(imdb_id)
if not tvmaze_id:
tvmaze_id = TvMazeApi.get_show_id_from_name(show)
if not tvmaze_id:
return {
"Error": f"Could not find show_id for '{show}' or imdb_id:'{imdb_id}'"
}
url = f"http://api.tvmaze.com/shows/{tvmaze_id}"
return TvMazeApi.call(url)
@staticmethod
def lookup_episode(season, episode, show=None, imdb_id=None, tvmaze_id=None):
if not tvmaze_id and imdb_id:
tvmaze_id = TvMazeApi.get_show_id_from_imdb_id(imdb_id)
if not tvmaze_id:
tvmaze_id = TvMazeApi.get_show_id_from_name(show)
if not tvmaze_id:
return {
"Error": f"Could not find show_id for '{show}' or imdb_id:'{imdb_id}'"
}
params = {"season": season, "number": episode}
url = f"http://api.tvmaze.com/shows/{tvmaze_id}/episodebynumber"
return TvMazeApi.call(url, params)
@staticmethod
def call(url, params=None):
reply = requests.get(url=url, params=params or {}).json()
print(reply)
mapping = {"name": "Title", "summary": "Plot"}
data = {}
for src_key, data_key in mapping.items():
val = reply.get(src_key)
if val:
data[data_key] = html2text.html2text(val).strip()
year = reply.get("airdate")
if year:
data["Year"] = int(year[0:4])
rating = reply.get("rating")
if rating:
data["imdbRating"] = rating.get("average")
print(data)
return types.SimpleNamespace(**data)
@staticmethod
def get_show_id_from_imdb_id(imdb_id):
params = {"imdb": imdb_id}
url = "http://api.tvmaze.com/lookup/shows"
try:
return requests.get(url=url, params=params).json().get("id")
except:
pass
@staticmethod
def get_show_id_from_name(show):
params = {"q": show}
url = "http://api.tvmaze.com/singlesearch/shows"
try:
return requests.get(url=url, params=params).json().get("id")
except:
pass
def update_track_with_data(track, data, write):
imdb_to_itunes = {
"Title": "name",
"Plot": "description",
"Year": "year",
"Genre": "genre",
"imdbID": "comment",
"imdbRating": "grouping",
# "Actors": "artist",
}
for imdb_key, itunes_key in imdb_to_itunes.items():
value = getattr(data, imdb_key, None)
if value and value != "N/A" and value != "Not Found":
if itunes_key == "genre":
value = value.split(",")[0].strip()
elif itunes_key == "comment":
value = f"https://www.imdb.com/title/{value}"
elif itunes_key in ["grouping", "bpm"]:
value = imdb_rating_to_itunes_bpm(value)
if value:
print(f"{itunes_key}: {value}")
if write:
getattr(track, itunes_key).set(value)
class AddImdbRatings:
def __init__(self, args):
self.args = args
self.itunes = appscript.app(args.app)
self.lookup_api = LookupAPI()
self.add_imdb_ratings_to_selection()
def movie_choice(
self, movie_name, movie_year=None, choose_first=False, quick=False
):
print(f" Retrieving data from IMDb for {movie_name} ({movie_year})")
match = re.search(r" *\((sml|4k)\) *$", movie_name, re.IGNORECASE)
if match:
movie_name = movie_name[0 : -1 * len(match.group(0))]
movies = self.lookup_api.search_by_name_and_year(movie_name, movie_year)
print(
f" https://www.imdb.com/find?s=all&q={urllib.parse.quote_plus(movie_name)}"
)
if not movies:
print(f""" No matches found for "{movie_name}" made in {movie_year}""")
if choose_first:
return None
print("> Type a new search term, hit enter to skip,")
else:
if choose_first:
return movies[0]
lines = []
for movie in movies:
if (
quick
and movie_year
and movie_name.lower() == movie.Title.lower()
and str(movie_year) == str(movie.Year)
):
return movie
movie_type = f" {movie.Type}" if movie.Type != "movie" else ""
movie_extra = f"({movie.Year}{movie_type})"
movie_link = f"""https://imdb.com/title/{movie.imdbID}"""
lines.append(f"{movie_link} {movie.Title} {movie_extra}")
print(" Potential Title Matches")
print("\n".join([f" {i}. {j}" for i, j in enumerate(lines)]))
# ask user what movie to use
print(
"> Type number of correct title (enter for first), a new search, 'no' to skip,"
)
print("> paste in an imdb url, or enter a score between 10 and 100")
reply = prefill_input("" if movies else movie_name)
if reply == movie_name and not movie_year:
reply = "no"
if reply == "":
if movies:
reply = "0"
else:
reply = "no"
if reply == "no":
return None
if reply == "quit":
return False
try:
reply_int = int(reply)
if 10 < reply_int <= 100:
return movie_from_dict({"imdbRating": reply_int * 0.1})
return movies[reply_int]
except ValueError:
# It's not a number so it must be a new search term.
match = re.search(r"(tt[0-9]+)", reply)
if match:
return movie_from_dict({"imdbID": match.group(1)})
# start a new search with whatever was entered
return self.movie_choice(reply)
def add_imdb_ratings_to_selection(self):
csv_file = None
csv_writer = None
if self.args.csv:
csv_filename = self.itunes.browser_windows()[0].view.name() + ".csv"
logging.info("Writing CSV: %s", csv_filename)
csv_file = open(csv_filename, "w", newline="")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(
["field1", "field2", "const", "field4", "description", "field6"]
)
for track in self.itunes.selection.get():
if self.args.write or not self.args.all:
print("\n\n\n\n")
movie_name = track.show.get() or track.name.get()
movie_year = track.year.get()
movie_imdb_id = None
match = re.search(r"(tt[0-9]+)", track.comment.get())
if match:
movie_imdb_id = match.group(1)
print(movie_name)
if self.args.rating:
movie_imdb_rating = str(float(self.args.rating) / 10.0)
movie = types.SimpleNamespace(imdbRating=movie_imdb_rating)
elif csv_writer and movie_imdb_id:
movie = types.SimpleNamespace(imdbID=movie_imdb_id)
else:
movie = self.movie_choice(
movie_name, movie_year, self.args.all, self.args.quick
)
if movie and movie.imdbID and movie.imdbID != movie_imdb_id:
track.comment.set(f"{movie.imdbID} \n{track.comment.get()}")
if movie is False:
break
if not movie:
continue
if self.args.open and hasattr(movie, "imdbID"):
subprocess.call(["open", f"https://www.imdb.com/title/{movie.imdbID}"])
if self.args.all:
time.sleep(1)
if csv_writer:
csv_writer.writerow(
["", "", movie.imdbID, movie_name, movie_year or "", ""]
)
continue
if not self.args.write:
continue
full_movie = movie
if not hasattr(full_movie, "imdbRating"):
full_movie = self.lookup_api.full_movie_info(movie.imdbID)
# IMDB rating
if self.args.everything:
update_track_with_data(track, full_movie, self.args.write)
else:
rating = imdb_rating_to_itunes_bpm(full_movie.imdbRating)
if rating:
print(f"Setting imdb rating (grouping) to {rating}")
track.grouping.set(rating)
# MPAA rating
if (
not self.args.mpaa
or not hasattr(full_movie, "Rated")
or not full_movie.Rated
or full_movie.Rated in "N/A,Not Rated,Unrated".split(",")
):
continue
location = track.location.get().path
mp4info = Mp4Info(location)
content_rating = mp4info.content_rating
if not content_rating or full_movie.Rated != content_rating.content_rating:
mp4info.content_rating = full_movie.Rated
# # @itunes.refresh(track), @itunes.selection.refresh, @itunes.add(location) NONE WORK!
if csv_file:
print("All Done Closing csv file")
csv_file.close()
return
Mp4Info.finalize()
# now refresh the selections
subprocess.check_output(
[
"/usr/bin/osascript",
"-e",
"""tell application "%s" to refresh selection""" % self.args.app,
]
)
class LookupAPI:
def __init__(self):
with open(os.path.expanduser("~/.omdbapikey")) as key_file:
self.omdbapikey = key_file.read().strip()
def data(self, query):
query["apikey"] = self.omdbapikey
return json.loads(
requests.get(url="http://www.omdbapi.com/", params=query).text,
object_hook=lambda d: types.SimpleNamespace(**d),
)
def lookup_episode(self, season, episode, show=None, imdb_id=None, tvmaze_id=None):
params = {"episode": episode, "season": season}
if imdb_id:
params["i"] = imdb_id
else:
params["t"] = show
return self.data(params)
def search(self, query, year=0):
params = {"s": query}
if year:
params["y"] = str(year)
objs = self.data(params)
if hasattr(objs, "Error"):
logging.error("ERROR: %r", objs.Error)
if hasattr(objs, "Search"):
return objs.Search
if year:
# no results found, try without a year
return self.search(query)
return []
def full_movie_info(self, imdb_id):
return self.data({"i": imdb_id})
def search_by_name_and_year(self, movie_name, movie_year=0):
movies = []
movie_results = self.search(movie_name, movie_year)
if not movie_results:
movie_results = []
for movie_result in movie_results:
# check that the year tag in the file name matches with the release date,
# otherwise not the movie we are looking for
if movie_year and movie_result.Year:
try:
if abs(int(movie_result.Year) - int(movie_year)) < 2:
movies.append(movie_result)
except ValueError:
pass
else:
movies.append(movie_result)
return movies
class Mp4Info:
things_to_wait_for = queue.Queue()
worker = None
def __init__(self, filename):
self.filename = filename
@property
def content_rating(self):
output = subprocess.check_output(["AtomicParsley", self.filename, "-t"])
match = re.search(r"com.apple.iTunes;iTunEXTC. contains: (.*)", output.decode())
if match:
logging.debug("match.group(1): %r", match.group(1))
arr = match.group(1).split("|")
logging.debug("Mp4Info.content_rating: %r", arr)
return types.SimpleNamespace(
**{"org": arr[0], "content_rating": arr[1], "code": arr[2]}
)
return None
@content_rating.setter
def content_rating(self, rating):
print(f"Setting content rating to {rating}")
Mp4Info.things_to_wait_for.put(
types.SimpleNamespace(**{"filename": self.filename, "rating": rating})
)
if not Mp4Info.worker:
logging.debug("making a worker thread")
Mp4Info.worker = threading.Thread(target=Mp4Info.work)
Mp4Info.worker.start()
@staticmethod
def finalize():
if not Mp4Info.things_to_wait_for.empty():
print("Waiting for AtomicParsley to finish writing content ratings...")
logging.debug("waiting for queue to be empty")
if Mp4Info.worker:
Mp4Info.things_to_wait_for.put(None)
logging.debug("waiting for worker thread to be done")
Mp4Info.worker.join()
Mp4Info.worker = None
@staticmethod
def work():
while True:
item = Mp4Info.things_to_wait_for.get()
Mp4Info.things_to_wait_for.task_done()
logging.debug("Mp4Info.work: %r", item)
if item:
subprocess.check_output(
[
"AtomicParsley",
item.filename,
"--overWrite",
"--contentRating",
item.rating,
]
)
logging.debug("done with AtomicParsley")
else:
break
if __name__ == "__main__":
main()
tell application "Terminal"
activate
set currentTab to do script "/Users/ark/bin/share/imdb_movie_ratings_adder; exit 0"
end tell
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment