Last active
May 2, 2023 03:34
-
-
Save neckothy/a1f5a45f86e82cd1f622bc54544155b2 to your computer and use it in GitHub Desktop.
send Komga library updates to Discord
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import re | |
import requests | |
import sseclient | |
import threading | |
import time | |
# Komga server config | |
KOMGA_HOST = "https://example.com/komga" | |
KOMGA_USERNAME = "catgirl.luvr2012@hotmail.com" | |
KOMGA_PASSWORD = "hunter2" | |
IGNORED_LIBRARIES = () # ("248EYRY8PE0KA", "561DGJL9PS4KZ") | |
# Discord webhook config | |
DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/etc" | |
DISCORD_WEBHOOK_NAME = "Komga" | |
# Timing config | |
EVENT_DELAY = 5 # Minutes, minimum announcement delay after event is detected | |
SCAN_INTERVAL = 15 # Minutes, delay between announcement checks | |
ANNOUNCEMENT_INTERVAL = 60 # Minutes, minimum delay between 2 announcements | |
RECONNECT_INTERVAL = 15 # Seconds, delay between attempts to reconnect to the SSE stream in case of connection loss | |
# Advanced config - mostly settings that can be changed to make it maybe sorta work with chapter or comic libraries instead | |
# regex used for volume number matching | |
VOLUME_PATTERN = " v(\d{2,3}) " | |
# string in front of the volume numbers, e.g. v = (v10), c = (c10) | |
VOLUME_STRING_PREFIX = "v" | |
# zero pad numbers to this many digits, e.g. 2 = (v01), 3 = (v001) | |
VOLUME_STRING_PADDING = 2 | |
# event types we care about (probably no reason to touch this) | |
EVENT_TYPES = ( | |
"BookAdded", | |
"SeriesAdded", | |
"BookDeleted", | |
"SeriesDeleted", | |
"LibraryDeleted", | |
) | |
session = requests.Session() | |
def split_field(text): | |
fts = [] | |
while len(text) > 1024: | |
newline_before_limit = text.rindex("\n", 0, 1023) | |
fts.append(text[:newline_before_limit]) | |
text = text[newline_before_limit + 1 :] | |
if len(text) > 0: | |
fts.append(text) | |
return fts | |
def get_volume_str(vol_ranges): | |
vol_str = f" ({VOLUME_STRING_PREFIX}" | |
for vol_range in vol_ranges: | |
start = str(vol_range[0]).zfill(VOLUME_STRING_PADDING) | |
end = str(vol_range[1]).zfill(VOLUME_STRING_PADDING) | |
if start == end: | |
vol_str += f"{start}, " | |
else: | |
vol_str += f"{start}-{end}, " | |
vol_str = re.sub(", $", ")", vol_str) | |
return vol_str | |
def get_ranges(numbers): # https://stackoverflow.com/a/48106843 | |
numbers = sorted(set(numbers)) | |
gaps = [[s, e] for s, e in zip(numbers, numbers[1:]) if s + 1 < e] | |
edges = iter(numbers[:1] + sum(gaps, []) + numbers[-1:]) | |
return list(zip(edges, edges)) | |
def get_name_by_id(content_type, content_id): | |
url = f"{KOMGA_HOST}/api/v1/{content_type}/{content_id}" | |
with session.get(url, auth=(KOMGA_USERNAME, KOMGA_PASSWORD)) as response: | |
response.raise_for_status() | |
return response.json()["name"] | |
def get_title_by_id(content_type, content_id): | |
url = f"{KOMGA_HOST}/api/v1/{content_type}/{content_id}" | |
with session.get(url, auth=(KOMGA_USERNAME, KOMGA_PASSWORD)) as response: | |
response.raise_for_status() | |
return response.json()["metadata"]["title"] | |
def send_discord_message(payload): | |
with requests.post(DISCORD_WEBHOOK_URL, json=payload) as response: | |
response.raise_for_status() | |
def build_discord_payload(data): | |
payload = {"username": DISCORD_WEBHOOK_NAME, "embeds": [{"fields": []}]} | |
for library_id, series_dict in data.items(): | |
new_series_lines, updated_series_lines = [], [] | |
library_name = get_name_by_id("libraries", library_id) | |
field_name = f"**{library_name}**" | |
for series_id, book_ids in series_dict.items(): | |
series_name = get_title_by_id("series", series_id) | |
volume_numbers = [] | |
for book_id in book_ids: | |
book_name = get_name_by_id("books", book_id) | |
match = re.search(r" v(\d{2,3}) ", book_name) | |
if match: | |
volume_numbers.append(int(match.group(1))) | |
if volume_numbers: | |
volume_ranges = get_ranges(volume_numbers) | |
volume_str = get_volume_str(volume_ranges) | |
else: | |
volume_str = "" | |
line_text = f"{series_name}{volume_str}" | |
if series_id in new_series: | |
new_series_lines.append(line_text) | |
else: | |
updated_series_lines.append(line_text) | |
new_series_text = "\n".join(sorted(new_series_lines, key=str.casefold)) | |
updated_series_text = "\n".join(sorted(updated_series_lines, key=str.casefold)) | |
field_text = "" | |
if new_series_text: | |
field_text += f"**New Series**\n{new_series_text}\n" | |
if updated_series_text: | |
field_text += f"**Updated Series**\n{updated_series_text}" | |
if len(field_text) > 1024: | |
fts = split_field(field_text) | |
for i, ft in enumerate(fts): | |
payload["embeds"][0]["fields"].append( | |
{ | |
"name": f"{field_name} ({i+1})", | |
"value": ft.strip(), | |
"inline": False, | |
} | |
) | |
else: | |
payload["embeds"][0]["fields"].append( | |
{"name": field_name, "value": field_text.strip(), "inline": False} | |
) | |
return payload | |
def handle_event(event): | |
global updated_libraries | |
global new_series | |
global last_event_time | |
if event.event in EVENT_TYPES: | |
last_event_time = time.time() | |
data = json.loads(event.data) | |
library_id = data.get("libraryId") | |
if library_id in IGNORED_LIBRARIES: | |
return | |
series_id = data.get("seriesId") | |
book_id = data.get("bookId") | |
if event.event == "BookAdded": | |
updated_libraries.setdefault(library_id, {}) | |
updated_libraries[library_id].setdefault(series_id, []) | |
updated_libraries[library_id][series_id].append(book_id) | |
elif event.event == "SeriesAdded": | |
new_series.append(series_id) | |
elif event.event == "BookDeleted": | |
if ( | |
library_id in updated_libraries | |
and series_id in updated_libraries[library_id] | |
and book_id in updated_libraries[library_id][series_id] | |
): | |
updated_libraries[library_id][series_id].remove(book_id) | |
if not updated_libraries[library_id][series_id]: | |
del updated_libraries[library_id][series_id] | |
if not updated_libraries[library_id]: | |
del updated_libraries[library_id] | |
elif event.event == "SeriesDeleted": | |
if ( | |
library_id in updated_libraries | |
and series_id in updated_libraries[library_id] | |
): | |
del updated_libraries[library_id][series_id] | |
if not updated_libraries[library_id]: | |
del updated_libraries[library_id] | |
elif event.event == "LibraryDeleted": | |
if library_id in updated_libraries: | |
del updated_libraries[library_id] | |
def stream_events(): | |
while True: | |
try: | |
with session.get( | |
f"{KOMGA_HOST}/sse/v1/events", | |
auth=(KOMGA_USERNAME, KOMGA_PASSWORD), | |
stream=True, | |
) as stream_response: | |
print("Connected to stream, listening for events") | |
client = sseclient.SSEClient(stream_response) | |
for event in client.events(): | |
handle_event(event) | |
except: | |
time.sleep(RECONNECT_INTERVAL) | |
if __name__ == "__main__": | |
sse_thread = threading.Thread(target=stream_events) | |
sse_thread.start() | |
last_announcement_time = 0 | |
last_event_time = 0 | |
updated_libraries = {} | |
new_series = [] | |
while True: | |
current_time = time.time() | |
if ( | |
current_time - last_announcement_time >= ANNOUNCEMENT_INTERVAL * 60 | |
and current_time - last_event_time >= EVENT_DELAY * 60 | |
): | |
if updated_libraries: | |
series_announcement_payload = build_discord_payload(updated_libraries) | |
send_discord_message(series_announcement_payload) | |
updated_libraries = {} | |
new_series = [] | |
last_announcement_time = current_time | |
else: | |
time.sleep(SCAN_INTERVAL * 60) | |
else: | |
time.sleep(SCAN_INTERVAL * 60) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
requirements: | |
probably some not-ancient version of python | |
requests | |
sseclient-py (https://pypi.org/project/sseclient-py/) | |
usage: | |
save script | |
modify settings at the top | |
run with python | |
notes: | |
mostly designed for manga/ln "standard" filenames, but there are a few configs to help support others | |
will split fields which exceed the field character limit (1024) but will likely error if an embed exceeds the embed character limit (4096) | |
splitting embeds could be added similarly to splitting fields, I just don't think it's needed | |
this was fully rewritten recently to be less bad and hopefully more readable | |
hasn't been tested much since, let me know if you encounter issues |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment