Skip to content

Instantly share code, notes, and snippets.

@neckothy
Last active May 2, 2023 03:34
Show Gist options
  • Save neckothy/a1f5a45f86e82cd1f622bc54544155b2 to your computer and use it in GitHub Desktop.
Save neckothy/a1f5a45f86e82cd1f622bc54544155b2 to your computer and use it in GitHub Desktop.
send Komga library updates to Discord
import json
import re
import requests
import sseclient
import threading
import time
# Komga server config
KOMGA_HOST = "https://example.com/komga"
KOMGA_USERNAME = "catgirl.luvr2012@hotmail.com"
KOMGA_PASSWORD = "hunter2"
IGNORED_LIBRARIES = () # ("248EYRY8PE0KA", "561DGJL9PS4KZ")
# Discord webhook config
DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/etc"
DISCORD_WEBHOOK_NAME = "Komga"
# Timing config
EVENT_DELAY = 5 # Minutes, minimum announcement delay after event is detected
SCAN_INTERVAL = 15 # Minutes, delay between announcement checks
ANNOUNCEMENT_INTERVAL = 60 # Minutes, minimum delay between 2 announcements
RECONNECT_INTERVAL = 15 # Seconds, delay between attempts to reconnect to the SSE stream in case of connection loss
# Advanced config - mostly settings that can be changed to make it maybe sorta work with chapter or comic libraries instead
# regex used for volume number matching
VOLUME_PATTERN = " v(\d{2,3}) "
# string in front of the volume numbers, e.g. v = (v10), c = (c10)
VOLUME_STRING_PREFIX = "v"
# zero pad numbers to this many digits, e.g. 2 = (v01), 3 = (v001)
VOLUME_STRING_PADDING = 2
# event types we care about (probably no reason to touch this)
EVENT_TYPES = (
"BookAdded",
"SeriesAdded",
"BookDeleted",
"SeriesDeleted",
"LibraryDeleted",
)
session = requests.Session()
def split_field(text):
fts = []
while len(text) > 1024:
newline_before_limit = text.rindex("\n", 0, 1023)
fts.append(text[:newline_before_limit])
text = text[newline_before_limit + 1 :]
if len(text) > 0:
fts.append(text)
return fts
def get_volume_str(vol_ranges):
vol_str = f" ({VOLUME_STRING_PREFIX}"
for vol_range in vol_ranges:
start = str(vol_range[0]).zfill(VOLUME_STRING_PADDING)
end = str(vol_range[1]).zfill(VOLUME_STRING_PADDING)
if start == end:
vol_str += f"{start}, "
else:
vol_str += f"{start}-{end}, "
vol_str = re.sub(", $", ")", vol_str)
return vol_str
def get_ranges(numbers): # https://stackoverflow.com/a/48106843
numbers = sorted(set(numbers))
gaps = [[s, e] for s, e in zip(numbers, numbers[1:]) if s + 1 < e]
edges = iter(numbers[:1] + sum(gaps, []) + numbers[-1:])
return list(zip(edges, edges))
def get_name_by_id(content_type, content_id):
url = f"{KOMGA_HOST}/api/v1/{content_type}/{content_id}"
with session.get(url, auth=(KOMGA_USERNAME, KOMGA_PASSWORD)) as response:
response.raise_for_status()
return response.json()["name"]
def get_title_by_id(content_type, content_id):
url = f"{KOMGA_HOST}/api/v1/{content_type}/{content_id}"
with session.get(url, auth=(KOMGA_USERNAME, KOMGA_PASSWORD)) as response:
response.raise_for_status()
return response.json()["metadata"]["title"]
def send_discord_message(payload):
with requests.post(DISCORD_WEBHOOK_URL, json=payload) as response:
response.raise_for_status()
def build_discord_payload(data):
payload = {"username": DISCORD_WEBHOOK_NAME, "embeds": [{"fields": []}]}
for library_id, series_dict in data.items():
new_series_lines, updated_series_lines = [], []
library_name = get_name_by_id("libraries", library_id)
field_name = f"**{library_name}**"
for series_id, book_ids in series_dict.items():
series_name = get_title_by_id("series", series_id)
volume_numbers = []
for book_id in book_ids:
book_name = get_name_by_id("books", book_id)
match = re.search(r" v(\d{2,3}) ", book_name)
if match:
volume_numbers.append(int(match.group(1)))
if volume_numbers:
volume_ranges = get_ranges(volume_numbers)
volume_str = get_volume_str(volume_ranges)
else:
volume_str = ""
line_text = f"{series_name}{volume_str}"
if series_id in new_series:
new_series_lines.append(line_text)
else:
updated_series_lines.append(line_text)
new_series_text = "\n".join(sorted(new_series_lines, key=str.casefold))
updated_series_text = "\n".join(sorted(updated_series_lines, key=str.casefold))
field_text = ""
if new_series_text:
field_text += f"**New Series**\n{new_series_text}\n"
if updated_series_text:
field_text += f"**Updated Series**\n{updated_series_text}"
if len(field_text) > 1024:
fts = split_field(field_text)
for i, ft in enumerate(fts):
payload["embeds"][0]["fields"].append(
{
"name": f"{field_name} ({i+1})",
"value": ft.strip(),
"inline": False,
}
)
else:
payload["embeds"][0]["fields"].append(
{"name": field_name, "value": field_text.strip(), "inline": False}
)
return payload
def handle_event(event):
global updated_libraries
global new_series
global last_event_time
if event.event in EVENT_TYPES:
last_event_time = time.time()
data = json.loads(event.data)
library_id = data.get("libraryId")
if library_id in IGNORED_LIBRARIES:
return
series_id = data.get("seriesId")
book_id = data.get("bookId")
if event.event == "BookAdded":
updated_libraries.setdefault(library_id, {})
updated_libraries[library_id].setdefault(series_id, [])
updated_libraries[library_id][series_id].append(book_id)
elif event.event == "SeriesAdded":
new_series.append(series_id)
elif event.event == "BookDeleted":
if (
library_id in updated_libraries
and series_id in updated_libraries[library_id]
and book_id in updated_libraries[library_id][series_id]
):
updated_libraries[library_id][series_id].remove(book_id)
if not updated_libraries[library_id][series_id]:
del updated_libraries[library_id][series_id]
if not updated_libraries[library_id]:
del updated_libraries[library_id]
elif event.event == "SeriesDeleted":
if (
library_id in updated_libraries
and series_id in updated_libraries[library_id]
):
del updated_libraries[library_id][series_id]
if not updated_libraries[library_id]:
del updated_libraries[library_id]
elif event.event == "LibraryDeleted":
if library_id in updated_libraries:
del updated_libraries[library_id]
def stream_events():
while True:
try:
with session.get(
f"{KOMGA_HOST}/sse/v1/events",
auth=(KOMGA_USERNAME, KOMGA_PASSWORD),
stream=True,
) as stream_response:
print("Connected to stream, listening for events")
client = sseclient.SSEClient(stream_response)
for event in client.events():
handle_event(event)
except:
time.sleep(RECONNECT_INTERVAL)
if __name__ == "__main__":
sse_thread = threading.Thread(target=stream_events)
sse_thread.start()
last_announcement_time = 0
last_event_time = 0
updated_libraries = {}
new_series = []
while True:
current_time = time.time()
if (
current_time - last_announcement_time >= ANNOUNCEMENT_INTERVAL * 60
and current_time - last_event_time >= EVENT_DELAY * 60
):
if updated_libraries:
series_announcement_payload = build_discord_payload(updated_libraries)
send_discord_message(series_announcement_payload)
updated_libraries = {}
new_series = []
last_announcement_time = current_time
else:
time.sleep(SCAN_INTERVAL * 60)
else:
time.sleep(SCAN_INTERVAL * 60)
requirements:
probably some not-ancient version of python
requests
sseclient-py (https://pypi.org/project/sseclient-py/)
usage:
save script
modify settings at the top
run with python
notes:
mostly designed for manga/ln "standard" filenames, but there are a few configs to help support others
will split fields which exceed the field character limit (1024) but will likely error if an embed exceeds the embed character limit (4096)
splitting embeds could be added similarly to splitting fields, I just don't think it's needed
this was fully rewritten recently to be less bad and hopefully more readable
hasn't been tested much since, let me know if you encounter issues
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment