Skip to content

Instantly share code, notes, and snippets.

@d8ahazard
Last active June 22, 2023 17:49
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save d8ahazard/27caeb96449eb74db902d3ef9f4c0d2f to your computer and use it in GitHub Desktop.
Save d8ahazard/27caeb96449eb74db902d3ef9f4c0d2f to your computer and use it in GitHub Desktop.
A utility for cleaning Plex collections and identifying orphaned media.
import os
import pathlib
import sqlite3
target_path = ""
movie_list = []
tv_list = []
collection_list = []
def find_db():
global target_path
paths = [
"E:/PMSData/Plex Media Server/",
"~/Library/Application Support/Plex Media Server/",
"/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/",
"/volume1/Plex/Library",
"/Volume1/Plex/Library/Application Support/Plex Media Server/",
"/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/",
"/usr/local/plexdata/Plex Media Server/",
"/Internal/Android/data/com.plexapp.mediaserver.smb/Plex Media Server/",
"/home/plex/Library/Application Support/Plex Media Server",
"/raid/data/module/Plex/sys/Plex Media Server/",
"/mnt/HD/HD_a2/plex_conf/Plex Media Server/",
"/shares/Storage/.wdcache/.plexmediaserver/Application Support/Plex Media Server/"
]
if os.getenv("LOCALAPPDATA") is not None:
paths.append(os.getenv("LOCALAPPDATA") + "/Plex Media Server/")
if os.getenv("PLEX_HOME") is not None:
paths.append(os.getenv("PLEX_HOME") + "/Library/Application Support/Plex Media Server/")
if os.getenv("JAIL_ROOT") is not None:
paths.append(os.getenv("JAIL_ROOT") + "/var/db/plexdata/Plex Media Server/")
for db_path in paths:
pprint("")
pprint("Checking path {}".format(db_path))
if os.path.exists(db_path):
target_path = db_path
pprint("We have the path! " + db_path)
pprint("")
break
else:
pprint("No path found.")
pprint("")
def find_orphans():
find_db()
global target_path
global movie_list
global tv_list
file_extensions = [".avi", ".mkv", ".mp4", ".m2ts", ".ts", ".mov", ".wmv", ".3gpp", ".flv", ".mpeg", ".mpg"]
extra_media_tags = ["-behindthescenes", "-deleted", "-featurette", "-interview", "-scene", "-short", "-trailer",
"-other"]
if target_path != "":
movie_list = []
tv_list = []
media_files = []
section_ids = []
sections = []
movie_orphans = []
# tv_orphans = []
empty_dirs = []
db_path = target_path + "Plug-in Support/Databases/com.plexapp.plugins.library.db"
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""select media_parts.file, metadata_items.title, metadata_items.metadata_type, metadata_items.id,
metadata_items.library_section_id from media_parts inner join media_items on
media_parts.media_item_id=media_items.id inner join metadata_items on
media_items.metadata_item_id=metadata_items.id;""")
for (file, title, metadata_type, meta_id, section_id) in c.fetchall():
if metadata_type == 4:
tv_list.append({"file": file, "title": title, "id": meta_id})
if metadata_type == 1:
movie_list.append({"file": file, "title": title, "id": meta_id})
if section_id not in section_ids:
section_ids.append(section_id)
c.execute("""select sl.library_section_id, sl.root_path, ls.user_fields from section_locations as sl
inner join library_sections as ls on ls.id = sl.library_section_id""")
for (section_id, location, settings) in c.fetchall():
if section_id in section_ids:
if "collectionMode=2" in settings:
settings.replace("collectionMode=2", "collectionMode=0")
pprint("Updating settings for section {} to disable collection recreation.".format(section_id))
c.execute("UPDATE library_sections SET user_fields=? WHERE id=?", (settings, section_id))
conn.commit()
sections.append(location)
conn.close()
if len(sections) > 0:
for section in sections:
pprint("Checking for files in {}".format(section))
i = 0
for media_dir in os.walk(section):
if i == 0:
children = media_dir[1]
for child in children:
has_media = False
full_path = os.path.join(section, child)
for media_child in os.walk(full_path):
media_items = media_child[2]
for media in media_items:
is_extra = False
for tag in extra_media_tags:
if tag in media:
is_extra = True
break
if is_extra is False:
ext = pathlib.Path(media).suffix.lower()
if ext in file_extensions:
has_media = True
full_path = os.path.join(full_path, media)
media_files.append(full_path)
if has_media is False:
empty_dirs.append(full_path)
i += 1
for movie in movie_list:
if movie["file"] not in media_files:
movie_orphans.append(movie)
# for show in tv_list:
# if show["file"] not in media_files:
# tv_orphans.append(show)
pprint("")
pprint("Movies not in Plex Database:\n")
pprint("Path, title")
for plex_orphan in movie_orphans:
pprint("{}, {}".format(plex_orphan["file"], plex_orphan["title"]))
# pprint("")
# pprint("Episodes not in Plex Database:")
# pprint("Path, title")
# for tv_orphan in tv_orphans:
# pprint("{}, {}".format(tv_orphan["file"], tv_orphan["title"]))
pprint("")
pprint("Empty directories:\n")
pprint("Path")
for empty_dir in empty_dirs:
pprint(empty_dir)
pprint("")
input(" PRESS [RETURN] TO CONTINUE.")
main_menu()
def main_menu():
clear()
ans = True
while ans:
ans = input("""
GREETINGS PROFESSOR FALKEN, SHALL WE PLAY A GAME?
1. FIND MISSING MEDIA
2. COLLECTION CLEANUP
3. GLOBAL THERMONUCLEAR WAR
4. TERMINATE APPLICATION
""")
if ans == "1":
clear()
find_orphans()
elif ans == "2":
clear()
clean_collections()
elif ans == "3":
clear()
pprint("\nAre you out of your damned mind?")
input("\nPRESS [RETURN] TO CONTINUE")
main_menu()
elif ans == "4":
clear()
quit()
elif ans != "":
pprint("\nINVALID ENTRY.")
def clean_collections():
ans = True
while ans:
ans = input("""\n What is the minimum number of items to have in a collection? (2)""")
if ans == "":
ans = "2"
if ans >= "2":
clear()
pprint("\nCleaning useless collections.\n")
clean_query(int(ans))
main_menu()
else:
pprint("\nNothing to do.")
main_menu()
main_menu()
def clean_query(count):
find_db()
items_to_remove = []
items_to_show = []
if target_path != "":
db_path = target_path + "Plug-in Support/Databases/com.plexapp.plugins.library.db"
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute(""" select title, extra_data, id from metadata_items where metadata_type=18 order by title asc""")
pprint("Items to remove:\n")
pprint("title, child_count")
for (title, extra, meta_id) in c.fetchall():
child_count = extra.split("&")[0].split("=")[1]
child_item = {"title": title, "extra": extra, "id": meta_id, "count": child_count}
if int(child_count) < count and "collectionMode=0" not in extra:
items_to_remove.append(child_item)
elif int(child_count) >= count and "collectionMode=0" in extra:
items_to_show.append(child_item)
if len(items_to_remove):
pprint("Items to hide:\n")
pprint("title, count")
for remove in items_to_remove:
pprint("{}, {}".format(remove["title"], remove["count"]))
if len(items_to_show):
pprint("\n\nItems to show:\n")
pprint("title, count")
for show in items_to_show:
pprint("{}, {}".format(show["title"], show["count"]))
if len(items_to_remove) or len(items_to_show):
confirmation = input("About to show/hide the following collections. Press y to continue (y/n)")
if confirmation == "y":
try:
c.execute("DROP index 'index_title_sort_naturalsort'")
c.execute("DELETE from schema_migrations where version='20180501000000'")
except sqlite3.OperationalError:
pprint("Nothing to see here...")
conn.commit()
pprint("Updating collections to show.")
do_clean = True
try:
for remove in items_to_remove:
extra = remove["extra"] + "&pr%3AcollectionMode=0"
meta_id = remove["id"]
c.execute("UPDATE metadata_items SET extra_data=? WHERE id=?", (extra, meta_id))
for show in items_to_show:
extra = show["extra"].replace("&pr%3AcollectionMode=0", "")
meta_id = show["id"]
c.execute("UPDATE metadata_items SET extra_data=? WHERE id=?", (extra, meta_id))
except sqlite3.OperationalError as msg:
pprint("Error executing query, try repairing your DB.")
conn.commit()
pprint("Cleanup completed.\n")
pprint("Please restart Plex to rebuild the 'natural sort' table.\n")
else:
pprint("Cleanup canceled.")
else:
pprint("No collections found with less than {} items.".format(count))
conn.close()
else:
pprint("DB Path not found!")
input(" PRESS [RETURN] TO CONTINUE.")
def clear():
os.system('cls' if os.name == 'nt' else 'clear')
def pprint(str_input):
print(" " + str_input)
main_menu()
@d8ahazard
Copy link
Author

Written in python 3.7, tested with windows Server 2019.

Designed to be as cross-platform friendly as possible, but any changes/fixes are welcome.

@RoBo-OnGitHub
Copy link

Thanks d8ahazard, for showing your solution(s).

As addition to your script I can let you know that for the latest PMS (Plex Media Server) on Synology the path probably is: "/volume1/PlexMediaServer/AppData/Plex Media Server/".
Furthermore if you want to have more users use the script, a bit more explanation might be required. Option 1 is to find stuff on your drive that might not be in PMS; Option 2 is .. eh? Don't know ;-)

Since PMS is using a lot of diskspace (ca. 35GB from which ca. 5GB is the database), I wanted to answer if all that diskspace is really being used (to be able to remove this from the folders). But as this hasn't been one of your problems to solve, I need to search a bit more. Of cause I'm thankful about any hints.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment