Skip to content

Instantly share code, notes, and snippets.

@st1vms
Created February 16, 2024 11:36
Show Gist options
  • Save st1vms/54004c002c8a189c534c24e59fdca453 to your computer and use it in GitHub Desktop.
Save st1vms/54004c002c8a189c534c24e59fdca453 to your computer and use it in GitHub Desktop.
Fetch Discord threads data from a specific channel
"""Discord threads crawler module"""
from datetime import datetime
from dataclasses import dataclass
import requests
# Override user agent
USER_AGENT = ""
# Discord Authentication header
AUTH_HEADER = ""
# Discord Cookie string
COOKIE_STR = ""
# Max number of threads to return
LIMIT = 25
# Tags string list to apply when searching
TAGS = []
# Target Guild ID
GUILD_ID = ""
# Target Threads Channel ID
CHANNEL_ID = ""
BASE_URL = "https://discord.com/api/v9/channels"
BASE_HEADERS = {
"Host": "discord.com",
"User-Agent": USER_AGENT,
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.5",
"Authorization": AUTH_HEADER,
"X-Discord-Locale": "it",
"X-Discord-Timezone": "Europe/Rome",
"X-Debug-Options": "bugReporterEnabled",
"DNT": "1",
"Sec-GPC": "1",
"Connection": "keep-alive",
"Cookie": COOKIE_STR,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"TE": "trailers",
}
@dataclass(frozen=True)
class ThreadMetadata:
"""Thread metadata class"""
name: str
msg_count: int
creation_date: datetime
is_archived: bool
is_locked: bool
def __parse_thread_dict(thread: dict) -> ThreadMetadata:
return ThreadMetadata(
thread["name"],
thread["message_count"],
datetime.fromisoformat(thread["thread_metadata"]["create_timestamp"]),
thread["thread_metadata"]["archived"],
thread["thread_metadata"]["locked"],
)
def __get_endpoint(
channel_id: str, limit: int = 25, tags: list[str] = None, offset: int = 0
) -> str:
"""Retrieve Discord threads search endpoint"""
tag_string = ""
if tags:
tag_string = f"&tag={tag_string}" + ",".join(tags) + "&tag_setting=match_some"
return (
f"{BASE_URL}/{channel_id}"
"/threads/search?archived=true&sort_by=last_message_time&sort_order=desc&"
f"limit={limit}{tag_string}&offset={offset}"
)
def get_threads(
guild_id: str, channel_id: str, tags: list[str] = None, limit: int = 25
) -> list[ThreadMetadata] | None:
"""Retrieve thread json object"""
out: list[ThreadMetadata] = []
headers = BASE_HEADERS
headers["Referer"] = f"https://discord.com/channels/{guild_id}/{channel_id}"
count = 0
while count < limit:
url = __get_endpoint(channel_id, limit=limit, tags=tags, offset=count)
res = requests.get(url, headers=headers, timeout=10)
if res.status_code == 200:
json = res.json()
if json is not None and "threads" in json:
count += len(json["threads"])
out.extend([__parse_thread_dict(t) for t in json["threads"]])
if not json["has_more"] or count >= json["total_results"]:
break
continue
print(f"Error : {res.content}")
break
return out
if __name__ == "__main__":
threads = get_threads(GUILD_ID, CHANNEL_ID, tags=TAGS, limit=LIMIT)
print("\n".join((str(t) for t in threads)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment