Last active
October 29, 2021 07:31
-
-
Save nwithan8/b0226148feda8ba2a42213590a800e8e to your computer and use it in GitHub Desktop.
Add all items from a Plex Media Server library section to a Radarr instance.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Add all items from a Plex Media Server library section to a Radarr instance. | |
Use this to mass import a Plex movie section into Radarr in the event of Radarr reset or database corruption. | |
Copyright 2020 - Nate Harris | |
""" | |
""" | |
Setup: ``pip install requests progress plexapi`` | |
""" | |
import json | |
import argparse | |
from urllib.parse import urlencode | |
from typing import List, Union | |
import re | |
import requests | |
from plexapi.library import ShowSection, MovieSection, PhotoSection, MusicSection | |
from plexapi.server import PlexServer | |
from progress.bar import Bar | |
parser = argparse.ArgumentParser() | |
parser.add_argument('plex_url', | |
type=str, | |
help="URL of Plex Media Server (including port)") | |
parser.add_argument('plex_token', | |
type=str, | |
help="Your Plex Media Server access token") | |
parser.add_argument('plex_section_name', | |
type=str, | |
help="Name of the Plex section to import into Radarr") | |
parser.add_argument('radarr_url', | |
type=str, | |
help="URL of Radarr instance (including port)") | |
parser.add_argument('radarr_api_key', | |
type=str, | |
help="API key for Radarr") | |
parser.add_argument('radarr_root_folder', | |
type=str, | |
help="Path to root folder for Radarr") | |
parser.add_argument('quality_profile_name', | |
type=str, | |
help="Quality profile to use when adding items") | |
parser.add_argument('-m', | |
'--monitor', | |
action='store_true', | |
help="Monitor new items being added.") | |
parser.add_argument('-v', | |
'--verbose', | |
action='store_true', | |
help="Verbose logging.") | |
args = parser.parse_args() | |
def extract_agent_type_and_id(guid_string: str): | |
id_search = re.search(":\/\/(.*)\?", guid_string) | |
if id_search: | |
id_search = id_search.group(1) | |
agent_search = re.search("^.*\.(.*):\/\/", guid_string) | |
if agent_search: | |
agent_search = agent_search.group(1) | |
return agent_search, id_search | |
def _translate_profile(user_indicated_profile: str = None): | |
if user_indicated_profile == 'announced': | |
return 'announced' | |
elif user_indicated_profile == 'in_cinemas': | |
return 'inCinemas' | |
elif user_indicated_profile == 'predb': | |
return 'preDB' | |
else: | |
return 'released' | |
class Plex: | |
def __init__(self, url, token): | |
self.url = url | |
self.token = token | |
self.server = PlexServer(url, token) | |
def get_all_library_sections(self): | |
return self.server.library.sections() | |
def get_specific_section(self, section_name: str): | |
for section in self.get_all_library_sections(): | |
if section.title == section_name: | |
return section | |
return None | |
def get_all_section_items(self, section: Union[ShowSection, MovieSection, PhotoSection, MusicSection]): | |
return section.all() | |
class Radarr: | |
def __init__(self, base_url: str, api_key: str, movies_root_folder_path: str): | |
self.url = base_url.rstrip('/') | |
self.key = api_key | |
self.rootFolderPath = movies_root_folder_path | |
def _get(self, endpoint: str, params: json = None): | |
url = f"{self.url}/api{endpoint}?apikey={self.key}" | |
if params: | |
url += f"&{urlencode(params)}" | |
if args.verbose: | |
print(f"GET {url}") | |
response = requests.get(url=url) | |
if args.verbose: | |
print(f"Response: {response}") | |
if response: | |
return response.json() | |
return False | |
def _post(self, endpoint: str, params: json = None, data: json = None): | |
url = f"{self.url}/api{endpoint}?apikey={self.key}" | |
if params: | |
url += f"&{urlencode(params)}" | |
if args.verbose: | |
print(f"POST {url}, data: {data}") | |
response = requests.post(url=url, | |
json=data) | |
if args.verbose: | |
print(f"Response: {response}") | |
if response: | |
return response.json() | |
else: | |
try: | |
response_data = response.json() | |
for error_data in response_data: | |
if error_data.get('errorMessage'): | |
print(error_data['errorMessage']) | |
except: | |
print(response.text) | |
return False | |
@property | |
def quality_profiles(self): | |
response = self._get(endpoint='/qualityProfile') | |
if response: | |
return response | |
return None | |
def get_quality_profile_number(self, quality_profile_name: str) -> Union[int, None]: | |
quality_profiles = self.quality_profiles | |
if quality_profiles: | |
for profile_data in quality_profiles: | |
if profile_data.get('name') == quality_profile_name: | |
return profile_data.get('id') | |
return None | |
def find_movie(self, xmdb_id: str = None, xmdb_type: str = None, title: str = None): | |
if not title and not xmdb_id: | |
raise Exception("Must provide either movie imdb_id or title.") | |
if xmdb_id and xmdb_type: | |
if xmdb_type == 'imdb': | |
details = self._get(endpoint='/movie/lookup/imdb', | |
params={'imdbId': xmdb_id}) | |
elif xmdb_type in ['tmdb', 'themoviedb']: | |
details = self._get(endpoint='/movie/lookup/tmdb', | |
params={'tmdbId': xmdb_id}) | |
else: | |
print(f"{xmdb_type} is not a supported metadata agent. Please use IMDb or TMDb.") | |
return None | |
elif title: | |
details = self._get(endpoint='/movie/lookup', | |
params={'term': title}) | |
if details: | |
if isinstance(details, List): | |
details = details[0] # first item assumed to be best match | |
return details | |
return None | |
def add_movie(self, | |
title: str, | |
qualityProfileId: int, | |
titleSlug: str, | |
images, | |
tmdbId: int, | |
minimumAvailabilityProfile: str, | |
year: int, | |
rootFolderPath: str, | |
monitored: bool = False, | |
search: bool = False | |
): | |
data = {'title': title, | |
'qualityProfileId': qualityProfileId, | |
'titleSlug': titleSlug, | |
'images': images, | |
'tmdbId': tmdbId, | |
'profileId': 6, | |
'year': year, | |
'rootFolderPath': rootFolderPath | |
} | |
if monitored: | |
data['monitored'] = True | |
if search: | |
data['addOptions'] = {'searchForMovie': True} | |
response_data = self._post(endpoint='/movie', | |
data=data) | |
if response_data: | |
return True | |
return False | |
def add_movie_by_xmdb_id_or_title(self, | |
xmdb_id: str = None, | |
xmdb_type: str = None, | |
title: str = None, | |
monitored: bool = False, | |
search: bool = False, | |
quality_profile_id: int = None, | |
minimum_availability_profile: int = None): | |
movie_details = self.find_movie(xmdb_id=xmdb_id, | |
xmdb_type=xmdb_type, | |
title=title) | |
if movie_details: | |
return self.add_movie(title=movie_details['title'], | |
qualityProfileId=( | |
quality_profile_id if quality_profile_id else movie_details.get( | |
'qualityProfileId') | |
), | |
titleSlug=movie_details['titleSlug'], | |
images=movie_details['images'], | |
tmdbId=movie_details['tmdbId'], | |
minimumAvailabilityProfile=_translate_profile( | |
user_indicated_profile=minimum_availability_profile), | |
year=movie_details['year'], | |
rootFolderPath=self.rootFolderPath, | |
monitored=monitored, | |
search=search) | |
return False | |
plex = Plex(url=args.plex_url, | |
token=args.plex_token) | |
radarr = Radarr(base_url=args.radarr_url, | |
api_key=args.radarr_api_key, | |
movies_root_folder_path=args.radarr_root_folder) | |
quality_profile_number = radarr.get_quality_profile_number(quality_profile_name=args.quality_profile_name) | |
plex_section = plex.get_specific_section(args.plex_section_name) | |
all_section_items = plex.get_all_section_items(section=plex_section) | |
with Bar('Adding to Radarr', max=len(all_section_items)) as bar: | |
for plex_item in all_section_items: | |
agent, agent_id = extract_agent_type_and_id(guid_string=plex_item.guid) | |
if args.verbose: | |
print(f"Parsed metadata: {agent_id} on {agent}") | |
if agent and agent_id: | |
radarr.add_movie_by_xmdb_id_or_title(xmdb_id=agent_id, | |
xmdb_type=agent, | |
title=plex_item.title, | |
quality_profile_id=quality_profile_number, | |
monitored=args.monitor) | |
else: | |
print("Missing either metadata agent or metadata agent ID") | |
bar.next() | |
bar.finish() |
What does "Missing either metadata agent or metadata agent ID" means ? Got this for all items.
And can i use it with tautulli ?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
possible change?
just so you can read the line better
from
to ?!