Skip to content

Instantly share code, notes, and snippets.

@ali5h
Last active January 10, 2022 23:16
Show Gist options
  • Save ali5h/313795d59931c4d128d2 to your computer and use it in GitHub Desktop.
Save ali5h/313795d59931c4d128d2 to your computer and use it in GitHub Desktop.
Creating IMDB Top 250 Collection in Plex Media Server
#!/usr/bin/python
"""
Downloads and creates a CSV file from IMDB Top 250 list
"""
from pyquery import PyQuery as pq
import csv
import datetime
# for UnicodeWriter
import codecs
import cStringIO
class UnicodeWriter:
"""
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def writerow(self, row):
self.writer.writerow([s.encode("utf-8") for s in row])
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = data.decode("utf-8")
# ... and reencode it into the target encoding
data = self.encoder.encode(data)
# write to the target stream
self.stream.write(data)
# empty queue
self.queue.truncate(0)
def writerows(self, rows):
for row in rows:
self.writerow(row)
movies = pq('http://www.imdb.com/chart/top')(".chart .titleColumn")
urls = [h.attr("href") for h in movies("a").items()]
titles = [h.text() for h in movies("a").items()]
years = [h.text().strip('()') for h in movies(".secondaryInfo").items()]
filename = "imdb-top250-%s.csv" % str(datetime.date.today())
f = open(filename, 'wb')
w = UnicodeWriter(f)
imdb250 = [['imdburl', 'title', 'year']]
for i in range(movies.length):
row = [urls[i], titles[i], years[i]]
imdb250.append(row)
w.writerows(imdb250)
f.close()
#!/usr/bin/python
"""
Creates a tag and tag all owned movies from CSV
v0.02
Best to back up database, but seems to work relatively well
"""
import os
import platform
import sqlite3
import re
import csv
import getopt
import sys
DRYRUN = True #Don't UPDATE databse
REPORT = False #Print out unowned films
def db_exec(cursor, query):
"""Wrap DB query so we can print in DRYRUN mode"""
if not DRYRUN:
cursor.execute(query)
else:
#print query
pass
def connect_db():
"""Connect to Plex DB (linux only at the moment)"""
#OSX
pf = platform.system()
if pf == 'Darwin':
homedir = os.path.expanduser("~")
conn = sqlite3.connect(
'%s/Library/Application Support/Plex Media Server/Plug-in Support/'
'Databases/com.plexapp.plugins.library.db' % (homedir))
elif pf == 'Linux':
conn = sqlite3.connect(
'/var/lib/plexmediaserver/Library/Application Support/'
'Plex Media Server/Plug-in Support/Databases/'
'com.plexapp.plugins.library.db')
elif pf == 'Windows':
localappdata = os.getenv('LOCALAPPDATA')
conn = sqlite3.connect(
'%s\\Plex Media Server\\Plug-in Support\\'
'Databases\\com.plexapp.plugins.library.db' % (localappdata))
cursor = conn.cursor()
return (conn, cursor)
def close_db(cursor):
"""Close DB cursor"""
cursor.close()
def generate_field_string(user_fields):
"""Take existing user_fields from DB and add locks for tag"""
fields = ""
if user_fields:
fields = re.split('=', user_fields)[1]
if fields:
items = fields.split('|')
items = [int(x) for x in items]
items.append(15)
items.append(16)
myset = set(items) #get rid of dupes
items = list(myset)
items.sort()
items = [str(x) for x in items]
fields = '|'.join(items)
else:
fields = "15|16"
new_fields = "lockedFields=%s" % fields
return new_fields
def remove_tag(tags_collection, tag_to_remove):
""" Build up collection string minus tag_to_remove """
new_tags = ""
items = tags_collection.split('|')
for i in items:
if i != tag_to_remove:
if not new_tags:
new_tags = i
else:
new_tags = "%s|%s" % (new_tags, i)
else:
continue
return new_tags
def update_metadata_items(cursor, metadata_id, tag_title, untag=False):
"""Update tags and user_fields in metadata_items table"""
cursor.execute('SELECT tags_collection, user_fields '
'FROM metadata_items WHERE id="%s"' % (metadata_id))
item = cursor.fetchone()
tags_collection = item[0]
user_fields = item[1]
new_field_string = generate_field_string(user_fields)
if tags_collection:
items = tags_collection.split('|')
for i in items:
if i == tag_title and not untag:
return
tags_collection = "%s|%s" % (tags_collection, tag_title)
else:
tags_collection = tag_title
if untag:
#Passed argument to unwind the tag set for film
tags_collection = remove_tag(tags_collection, tag_title)
db_exec(cursor, "UPDATE metadata_items SET tags_collection='%s',"
"user_fields='%s' WHERE id='%s'"
% (tags_collection, new_field_string, metadata_id))
def map_tag_to_metadata(cursor, tag_id, metadata_item_id, untag=False):
"""Create new taggings entry if it doesn't already exist"""
cursor.execute('SELECT id FROM taggings '
'WHERE tag_id="%s" and metadata_item_id="%s"'
% (tag_id, metadata_item_id))
item = cursor.fetchone()
if not item:
db_exec(cursor, 'INSERT into taggings (metadata_item_id, tag_id,'
'created_at) VALUES ("%s", "%s", datetime("now"))'
% (metadata_item_id, tag_id))
elif untag:
db_exec(cursor, 'DELETE from taggings WHERE '
'metadata_item_id=%s and tag_id=%s'
% (metadata_item_id, tag_id))
def insert_new_tag(cursor, title):
"""Insert new tag"""
db_exec(cursor, "INSERT into tags (tag, tag_type, created_at, updated_at) "
"VALUES ('%s', '2', datetime('now'), datetime('now'))" % (title))
return cursor.lastrowid
def create_tag(cursor, title):
"""Check if tag exists and if not then insert into db"""
cursor.execute('SELECT id, tag, tag_type FROM tags '
'WHERE tag_type = 2 and tag = "%s"' % (title))
item = cursor.fetchone()
if item:
#tag already exists in database
tag_id = item[0]
else:
#tag needs to be inserted
tag_id = insert_new_tag(cursor, title)
return tag_id
def find_imdb_id(imdb_url):
"""Extract id from imdb_url"""
re1 = '.*?'
re2 = '(\\d+)'
regex = re.compile(re1 + re2, re.IGNORECASE|re.DOTALL)
match = regex.search(imdb_url)
if not match:
return False
imdb_id = "tt" + match.group(1)
return imdb_id
def fetch_film_ids(cursor, csv_filename):
"""Get metadata ids for all films from Plex that appear in ICM CSV file"""
csv_imdb_ids = []
plex_imdb_ids = []
metadata_ids = []
title_lookup = {}
if not csv_filename:
return "Specify csv!"
else:
fhandle = open(csv_filename, 'rb')
reader = csv.reader(fhandle)
titles = reader.next()
reader = csv.DictReader(fhandle, titles)
for row in reader:
imdburl = row['imdburl']
imdb_id = find_imdb_id(imdburl)
if imdb_id:
csv_imdb_ids.append(imdb_id)
title_lookup[imdb_id] = row['title'] + " (" + row['year'] + ")"
else:
print "ERROR: Unable to find IMDB ID for %s" % (row['title'])
continue
fhandle.close()
cursor.execute('SELECT id, title, user_fields, guid '
'FROM metadata_items WHERE metadata_type=1')
films = cursor.fetchall()
#go through all films in database and try to match from CSV
for item in films:
imdburl = item[3]
imdb_id = find_imdb_id(imdburl)
if imdb_id in csv_imdb_ids:
#Keep ID lookup for optional reporting
plex_imdb_ids.append(imdb_id)
#matched movie from csv->db, so save metadata id
metadata_ids.append(item[0])
if REPORT:
unowned_films = list(set(csv_imdb_ids) - set(plex_imdb_ids))
if unowned_films:
print "-------"
print "%d of %d films owned" % (len(plex_imdb_ids),
len(csv_imdb_ids))
print "%d unowned films:" % (len(unowned_films))
for imdb_id in unowned_films:
print "\t" + title_lookup[imdb_id]
print "-------"
print "%d films owned:" % (len(plex_imdb_ids))
for imdb_id in plex_imdb_ids:
print "\t" + title_lookup[imdb_id]
print "-------"
return metadata_ids
def display_usage():
""" Display usage information and exit """
script_name = os.path.basename(__file__)
print """Usage: %s [options]
Options:
-f CSVFILENAME, --file=CSVFILENAME ICM top list CSV export [REQUIRED]
-t "TAG TITLE", --tag="TAG TITLE" Name of collection tag [REQUIRED]
-x, --execute Commit changes to database, else dry run
-r, --report Print list of films to tag and missing films
-u, --untag Remove "TAG TITLE" from all listed films, essentially an undo
-h, --help Show this help message and exit
Example:
%s -f 500+essential+cult+movies.csv -t "500 Essential Cult Movies" -x -r
The above will tag all owned movies from the .csv file with the tag "500 Essential Cult Movies", commit changes to database and print a report
This new collection can then be filtered in Plex
""" % (script_name, script_name)
sys.exit(2)
def main():
"""
Why does main need a docstring?
"""
tag_title = ""
csv_filename = ""
untag = False
try:
opts, dummy = getopt.getopt(sys.argv[1:], "f:t:xruh",
["file=", "tag=", "execute", "report", "untag", "help"])
except getopt.GetoptError:
sys.exit(2)
for opt, arg in opts:
if opt in ("-f", "--file"):
csv_filename = arg
if opt in ("-t", "--tag"):
tag_title = arg
if opt in ("-x", "--execute"):
global DRYRUN
DRYRUN = False
if opt in ("-r", "--report"):
global REPORT
REPORT = True
if opt in ("-u", "--untag"):
untag = True
if opt in ("-h", "--help"):
display_usage()
if not csv_filename or not tag_title:
display_usage()
conn, cursor = connect_db()
metadata_ids = fetch_film_ids(cursor, csv_filename)
tag_id = create_tag(cursor, tag_title)
for meta_id in metadata_ids:
map_tag_to_metadata(cursor, tag_id, meta_id, untag)
update_metadata_items(cursor, meta_id, tag_title, untag)
conn.commit()
close_db(cursor)
main()
@vjbonan
Copy link

vjbonan commented Oct 13, 2015

How do I install this / execute this on ubuntu?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment