Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save Shomesomesho/3eb2e26d66c10f8bcc4f220f6b7b1e30 to your computer and use it in GitHub Desktop.
Save Shomesomesho/3eb2e26d66c10f8bcc4f220f6b7b1e30 to your computer and use it in GitHub Desktop.
This script will read your Overseer data and create/apply user tags to all of your sonarr/radarr instances
"""
This script will read your Overseer data and create/apply user tags to all of your sonarr/radarr instances, then create a filter in each connected -arr application for the users you specify.
It is forward compatible with the future User Tagging feature of overseer, and formats the tag in the same 'id - lowercase username' pattern Overseer will
It only uses built in python libraries, so you should be able to download and run without much hassle.
NOTE: YOU ARE REQUIRED TO USE IP:PORT CONNECTIONS FOR YOUR SONARR/RADARR INSTANCES INSIDE OF OVERSEERR
This will NOT utilize docker-compose style hostnames at the moment, and I don't use them personally, so I don't see myself adding them
Steps to use:
1. Add your Overseer API key
2. Add your Overseer Internal URL (might work with external domain url, but I didn't test)
2.5 Edit the Default values for number of users/requests
3. Press Play and wait
"""
import requests
from requests import HTTPError
from typing import Any
import re
import logging
from requests.models import Response
from urllib3.util import Retry
OVERSEER_API_KEY = "YOURAPIKEY"
OVERSEER_URL = "http://YOURIP:PORT/api/v1"
# I didn't want to figure out Pagination, so I set defaults to what I felt would be the maximum someone could have.
# If you have more than 100 users, or a user has more than 1000 requests, you'll need to update these values to reflect that
NUM_USERS_TO_PULL = 100
NUM_MAX_USER_REQUESTS = 1000
def handle_response(response: Response, *args: Any, **kwargs: Any) -> None:
"""Handles the Response and throws an error if there is an error with the request
Args:
response (Response): The response of the call being made
Raises:
requests.exceptions.HTTPError: Error raised by API
"""
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.error(
f"{response.status_code} - {response.request.url} - {response.text}"
)
raise requests.exceptions.HTTPError(f"{str(e)}: {response.text}") from e
def make_session(api_key: str) -> requests.Session:
"""Creates a Requests Session with headers and logging set up
Args:
api_key (str): API key of service being accessed with the session
Returns:
requests.Session: Requests session with overhead added
"""
session = requests.Session()
session.hooks["response"] = [handle_response]
adapter = requests.adapters.HTTPAdapter(
max_retries=Retry(
total=10,
backoff_factor=5,
status_forcelist=[429, 500],
allowed_methods=["GET", "POST", "PUT"],
)
)
session.mount("https://", adapter)
session.headers.update({"X-Api-Key": api_key})
return session
def map_arr_api_keys() -> dict[str, str]:
"""Gets all sonarr/radarr servers + api keys from Overseer and returns a map of them
Returns:
dict[str,str]: A Map of -arr server_urls : api_keys
"""
requests_session = make_session(api_key=OVERSEER_API_KEY)
sonarr_servers = requests_session.get(url=OVERSEER_URL + "/settings/sonarr").json()
radarr_servers = requests_session.get(url=OVERSEER_URL + "/settings/radarr").json()
all_servers = sonarr_servers + radarr_servers
api_key_map = {}
for server in all_servers:
api_key_map[server["hostname"] + ":" + str(server["port"])] = server["apiKey"]
return api_key_map
def tag_requests_from_user(
arr_api_key_map: dict[str, str],
user_requests: dict[str, Any],
user_tag_string: str,
) -> None:
"""Tags all the requests for each user
Args:
arr_api_key_map (dict[str, str]): Map of the server URL and API key for each service connected to overseer
user_requests (dict[str, Any]): list of user requests
user_tag_string (str): Formatted user tag name. Follows "id - lowercase username" format
"""
for media_request in user_requests:
try:
tag_request_from_user(
media_request=media_request,
arr_api_key_map=arr_api_key_map,
user_tag_string=user_tag_string,
)
except ValueError as e:
logger.error(e)
def tag_request_from_user(
media_request: dict[str, Any], arr_api_key_map: dict[str, str], user_tag_string: str
):
"""Reads request data from Overseer, and finds the media within Sonarr/Radarr, and applies a user tag to that item in its respective server
Args:
media_request (dict[str, Any]): The Media Request metadata provided by Overseerr API
arr_api_key_map (dict[str, str]): Map of all servers connected to Overseerr, and their API keys
user_tag_string (str): Formatted user tag name. Follows "id - lowercase username" format
"""
if media_request["status"] == 4:
raise ValueError(
f"{arr_object_data['title']} has ERROR request status - Skipping"
)
if "serviceUrl" not in media_request["media"]:
raise ValueError(
f"{arr_object_data['title']} has no ServiceURL associated with it - Skipping"
)
# Unfortunately the provided service URL doesn't include the /v3/api slug, so we have to build our own
non_api_url = media_request["media"]["serviceUrl"]
ip_port = re.findall(r"[0-9]+(?:\.[0-9]+){3}:[0-9]+", non_api_url)[0]
base_url = "http://" + ip_port
service_url = base_url + "/api/v3"
if media_request["type"] == "tv":
request_path = "/series"
request_params = {"tvdbId": media_request["media"]["tvdbId"]}
else:
request_path = "/movie"
request_params = {"tmdbId": media_request["media"]["tmdbId"]}
requests_session = make_session(api_key=arr_api_key_map[ip_port])
arr_object_data = requests_session.get(
url=service_url + request_path,
params=request_params,
).json()
if len(arr_object_data) == 0:
raise ValueError(
f"{base_url} - {media_request['media']['externalServiceSlug']} is in the user's request list, but not found on server - Skipping"
)
arr_object_data = arr_object_data[0]
tag_data = requests_session.get(
url=service_url + "/tag",
).json()
# Because each request has its own server associated with it, we should check for the tag each time.
# The alternate way would be to group by server, then do one check per server, but we don't need to worry about api calls here
tag_id = get_tag_id(tag_data, user_tag_string)
if tag_id == -1:
logger.warning(f'{base_url} - Tag "{user_tag_string}" not found in server.')
tag_creation_response = create_user_tag(
requests_session=requests_session,
service_url=service_url,
user_tag_string=user_tag_string,
)
if tag_creation_response.ok:
tag_id = tag_creation_response.json()["id"]
logger.info(f"{base_url} - Created tag {user_tag_string} with id: {tag_id}")
else:
raise HTTPError(f'{base_url} - Failed to create tag "{user_tag_string}"')
if tag_id in arr_object_data["tags"]:
logger.info(
f"{base_url} - {user_tag_string} - {arr_object_data['title']} already has user tag"
)
else:
tag_addition_response = tag_media_with_user_data(
requests_session=requests_session,
service_url=service_url,
request_path=request_path,
request_params=request_params,
arr_object_data=arr_object_data,
tag_id=tag_id,
)
if tag_addition_response.ok:
logger.info(
f"{base_url} - {user_tag_string} - Tagged {arr_object_data['title']}"
)
else:
raise HTTPError(tag_addition_response.text)
def get_tag_id(tag_data: dict[str, Any], user_tag_string: str) -> int:
"""Gets the tagId of the user's tag from the respective server.
Args:
tag_data (dict[str, Any]): The Tag Data from the -arr api
user_tag_string (str): The tag name for the current overseer user
Returns:
int: The tagId of the respective -arr instance. Returns -1 if it doesn't exist
"""
for tag in tag_data:
if tag["label"] == user_tag_string:
return tag["id"]
return -1
def create_user_tag(
requests_session: requests.Session,
service_url: str,
user_tag_string: str,
) -> dict[str, Any]:
"""Create a user tag in Sonarr/Radarr
Args:
requests_session (requests.Session): Requests session for app you are creating tag in
service_url (str): the URL of the app you are creating the tag in
user_tag_string (str): tag string, which will be the tag name
Returns:
dict[str, Any]: Tag creation return data, including new ID
"""
return requests_session.post(
url=service_url + "/tag",
json={"label": user_tag_string},
)
def tag_media_with_user_data(
requests_session: requests.Session,
service_url: str,
request_path: str,
request_params: dict[str, Any],
arr_object_data: dict[str, Any],
tag_id: int,
) -> requests.Response:
"""Applies tag to selected media object
Args:
requests_session (requests.Session): Requests session for app you are apply tag in
service_url (str): URL of app
request_path (str): Slug to interact with media object
request_params (dict[str, Any]): Extra request params to dictate the media object
arr_object_data (dict[str, Any]): Media Object metadata from Sonarr/Radarr
tag_id (int): Tag ID to apply to arr_object_data
Returns:
requests.Response: Response from tag call
"""
if tag_id not in arr_object_data["tags"]:
arr_object_data["tags"].append(tag_id)
return requests_session.put(
url=service_url + request_path,
params=request_params,
json=arr_object_data,
)
def create_tag_filter_in_application(
arr_api_key_map: dict[str, str], user_tag_string: str
):
"""Create a custom filter in each server for the user tag
Args:
arr_api_key_map (dict[str, str]): Map of -arr URLs:API Keys
user_tag_string (str): Tag Name for the current user
"""
for server in arr_api_key_map:
base_url = "http://" + server + "/api/v3"
requests_session = make_session(api_key=arr_api_key_map[server])
current_filters = requests_session.get(url=base_url + "/customfilter").json()
current_filter_labels = [x["label"] for x in current_filters]
if user_tag_string not in current_filter_labels:
tag_info = requests_session.get(url=base_url + "/tag").json()
tag_id = get_tag_id(tag_data=tag_info, user_tag_string=user_tag_string)
server_info = requests_session.get(url=base_url + "/system/status").json()
if server_info["appName"].lower() == "sonarr":
filter_type = "series"
else:
filter_type = "movieIndex"
sonarr_filter = {
"type": filter_type,
"label": user_tag_string,
"filters": [{"key": "tags", "value": [tag_id], "type": "contains"}],
}
requests_session.post(url=base_url + "/customfilter", json=sonarr_filter)
logger.info(f"http://{server} - {user_tag_string} - Created Filter")
else:
logger.warning(
f"http://{server} - {user_tag_string} - Filter Already Exists - Skipping"
)
def main():
arr_api_key_map = map_arr_api_keys()
overseer_requests_session = make_session(api_key=OVERSEER_API_KEY)
all_users = overseer_requests_session.get(
url=OVERSEER_URL + "/user", params={"take": NUM_USERS_TO_PULL}
).json()["results"]
for user in all_users:
user_data = overseer_requests_session.get(
url=OVERSEER_URL + f"/user/{user['id']}"
).json()
# My users don't have a ton of requests, so I didn't want to bother figuring out pagination.
# This should just pull all requests (unless you have users who request A TON)
user_requests = overseer_requests_session.get(
url=OVERSEER_URL + f"/user/{user['id']}/requests",
params={"take": NUM_MAX_USER_REQUESTS},
).json()["results"]
user_tag_string = (
str(user_data["id"]) + " - " + user_data["displayName"].lower()
)
separator = "\n==============================================\n"
print(
separator
+ f" Tagging {user_data['displayName']}'s Media"
+ separator
)
if len(user_requests) > 0:
tag_requests_from_user(arr_api_key_map, user_requests, user_tag_string)
create_tag_filter_in_application(arr_api_key_map, user_tag_string)
else:
logger.warning(f"{user['displayName']} has no requests - Skipping")
if __name__ == "__main__":
# create logger with 'spam_application'
logger = logging.getLogger("overseer_tagger")
logger.setLevel(logging.INFO)
fh = logging.StreamHandler()
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment