Last active
April 19, 2024 19:49
-
-
Save 0187773933/d69b95917eee599500ad3cf811c996c3 to your computer and use it in GitHub Desktop.
Discord Channel Archive Bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import requests | |
from pprint import pprint | |
from box import Box | |
from pathlib import Path | |
import shutil | |
import json | |
from tqdm import tqdm | |
from concurrent.futures import ThreadPoolExecutor | |
import urllib.parse | |
# OVERWRITE = False | |
OVERWRITE = True | |
# https://stackoverflow.com/questions/1970807/center-middle-align-text-with-pil#1970930 | |
from PIL import Image , ImageDraw , ImageFont | |
import textwrap | |
def download_file( options ): | |
try: | |
if OVERWRITE == False: | |
if options[ 1 ].is_file() == True: | |
if options[ 1 ].stat().st_size > 1: | |
return True | |
r = requests.get( options[ 0 ] , stream=True ) | |
total_size = int( r.headers.get( "content-length" , 0 ) ) | |
block_size = 1024 | |
t = tqdm( total=total_size , unit="iB" , unit_scale=True ) | |
with open( str( options[ 1 ] ) , "wb" ) as f: | |
for data in r.iter_content( block_size ): | |
t.update( len( data ) ) | |
f.write( data ) | |
t.close() | |
if total_size != 0 and t.n != total_size: | |
print( "ERROR , something went wrong" ) | |
except Exception as e: | |
print( e ) | |
def write_json( file_path , python_object ): | |
with open( file_path , 'w', encoding='utf-8' ) as f: | |
json.dump( python_object , f , ensure_ascii=False , indent=4 ) | |
def read_json( file_path ): | |
with open( file_path ) as f: | |
return json.load( f ) | |
def batch_process( options ): | |
batch_size = len( options[ "batch_list" ] ) | |
with ThreadPoolExecutor() as executor: | |
result_pool = list( tqdm( executor.map( options[ "function_reference" ] , iter( options[ "batch_list" ] ) ) , total=batch_size ) ) | |
return result_pool | |
def supports_char( font , char ): | |
try: | |
return char in font.getmask(char).getbbox() | |
except TypeError: | |
return False | |
class DisordChannelArchiverBot: | |
def __init__( self , config={} ): | |
self.config = Box( config ) | |
self.headers = headers = { | |
"accept": "application/json, text/plain, */*" , | |
"Authorization": f"Bot {self.config.token}" | |
} | |
if self.config.write_text_to_image == True: | |
self.image_text_font = ImageFont.truetype( self.config.image_text_font_path , self.config.image_text_font_size ) | |
self.fallback_font = ImageFont.truetype( self.config.fallback_font_path , self.config.image_text_font_size ) | |
# self.get_guilds() | |
def enumerate_request( self , limit=100 , after=False ): | |
# https://discord.com/developers/docs/resources/channel#get-channel-messages | |
# https://discord.com/developers/docs/reference#snowflakes | |
pass | |
def get_guilds( self ): | |
# https://discord.com/developers/docs/resources/user#get-current-user-guilds | |
limit = 200 | |
params = { "limit": limit } | |
url = f"https://discord.com/api/users/@me/guilds" | |
response = requests.get( url , headers=self.headers , params=params ) | |
response.raise_for_status() | |
self.guilds = response.json() | |
return self.guilds | |
def get_guild_channels( self , guild_id ): | |
# https://discord.com/developers/docs/resources/guild#get-guild-channels | |
params = {} | |
url = f"https://discord.com/api/guilds/{guild_id}/channels" | |
response = requests.get( url , headers=self.headers , params=params ) | |
response.raise_for_status() | |
result = response.json() | |
return result | |
def get_channel_messages( self , channel_id ): | |
# https://discord.com/developers/docs/resources/channel#get-channel-messages | |
limit = 100 | |
params = { "limit": limit } | |
url = f"https://discord.com/api/channels/{channel_id}/messages" | |
response = requests.get( url , headers=self.headers , params=params ) | |
response.raise_for_status() | |
messages = response.json() | |
# arrives in reverse order , aka latest message = array[0] , first message = array[-1] | |
messages.reverse() | |
pprint( messages ) | |
if len( messages ) < limit: | |
return messages | |
finished = False | |
iterations = 1 | |
while finished == False: | |
print( f"Gathering {limit} new messages , Round = {iterations} , Total = {len( messages )}" ) | |
params[ "before" ] = messages[ 0 ][ "id" ] | |
response = requests.get( url , headers=self.headers , params=params ) | |
response.raise_for_status() | |
new_messages = response.json() | |
new_messages.reverse() | |
messages = new_messages + messages | |
iterations += 1 | |
if len( new_messages ) < limit: | |
finished = True | |
# print( len( messages ) ) | |
return messages | |
def write_text_to_image(self, text, output_path): | |
# Create a new image with the specified background color | |
image = Image.new('RGB', (self.config["image_text_max_width"], self.config["image_text_max_height"]), self.config["image_background_color"] ) | |
draw = ImageDraw.Draw(image) | |
# Split text into paragraphs by explicit line breaks, then wrap each paragraph | |
paragraphs = text.split('\n') | |
wrapped_text = [] | |
for paragraph in paragraphs: | |
wrapped_text.extend(textwrap.wrap(paragraph, width=self.config["image_text_max_width"])) # Adjust 'width' based on your font and desired wrapping | |
# Calculate font height using a typical character | |
font_height = draw.textbbox((0, 0), 'Hy', font=self.image_text_font)[3] | |
# Measure all lines to determine the total height | |
total_text_height = len(wrapped_text) * (font_height + 10) # Including line spacing | |
# Calculate starting y position to vertically center the text | |
current_y = (self.config["image_text_max_height"] - total_text_height) // 2 | |
# Initialize minimal bounding box coordinates for trimming | |
min_x, min_y, max_x, max_y = self.config["image_text_max_width"], self.config["image_text_max_height"], 0, 0 | |
# Draw each line | |
for line in wrapped_text: | |
# Calculate text width and set x to left align with padding | |
current_x = self.config["image_text_left_padding"] | |
# Draw the text on the image | |
draw.text((current_x, current_y), line, font=self.image_text_font, fill=self.config["image_text_fill_color"]) | |
bbox = draw.textbbox((current_x, current_y), line, font=self.image_text_font) | |
min_x = min(min_x, bbox[0]) | |
min_y = min(min_y, bbox[1]) | |
max_x = max(max_x, bbox[2]) | |
max_y = max(max_y, bbox[3]) | |
current_y += font_height + 10 # Increment y-position for the next line | |
# Trim the image if necessary | |
padding = 10 | |
if max_x + padding < self.config["image_text_max_width"] or max_y + padding < self.config["image_text_max_height"]: | |
image = image.crop((max(0, min_x - padding), max(0, min_y - padding), min(self.config["image_text_max_width"], max_x + padding), min(self.config["image_text_max_height"], max_y + padding))) | |
# Save the final image | |
image.save(output_path) | |
def download_all_message_attachments( self , output_directory , messages ): | |
download_list = [] | |
total_messages = len( messages ) | |
zfill_number = ( len( str( total_messages ) ) + 1 ) | |
item_total = 1 | |
for message_index , message in enumerate( messages ): | |
if "attachments" not in message: | |
continue | |
if len( message[ "attachments" ] ) < 1: | |
if "embeds" not in message: | |
if len( message[ "content" ] ) > 0: | |
if self.config.write_text_to_image == True: | |
output_path = output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.png' ) | |
self.write_text_to_image( message[ 'content' ] , output_path ) | |
item_total += 1 | |
continue | |
if len( message[ "embeds" ] ) < 1: | |
if len( message[ "content" ] ) > 0: | |
if self.config.write_text_to_image == True: | |
output_path = output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.png' ) | |
self.write_text_to_image( message[ 'content' ] , output_path ) | |
item_total += 1 | |
continue | |
for embed_index , embed in enumerate( message[ "embeds" ] ): | |
if "thumbnail" not in embed: | |
continue | |
if "proxy_url" not in embed[ "thumbnail" ]: | |
continue | |
file_type = embed[ "thumbnail" ][ "url" ].split( "." )[ -1 ][ 0 : 3 ] | |
if file_type == "jpe": | |
file_type = "jpeg" | |
download_list.append([ | |
embed[ "thumbnail" ][ "proxy_url" ] , | |
output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.{file_type}' ) | |
]) | |
item_total += 1 | |
else: | |
for attachment_index , attachment in enumerate( message[ "attachments" ] ): | |
if "url" not in attachment: | |
pprint( message ) | |
continue | |
if "filename" not in attachment: | |
pprint( message ) | |
continue | |
download_list.append([ | |
attachment[ "url" ] , | |
output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}{Path( attachment[ "filename" ] ).suffix}' ) | |
]) | |
item_total += 1 | |
# pprint( download_list ) | |
batch_process({ | |
"max_workers": 10 , | |
"batch_list": download_list , | |
"function_reference": download_file | |
}) | |
def get_channel( self , channel_id ): | |
params = {} | |
url = f"https://discord.com/api/channels/{channel_id}" | |
response = requests.get( url , headers=self.headers , params=params ) | |
response.raise_for_status() | |
result = response.json() | |
pprint( result ) | |
return result | |
def archive_channel( self , channel_id , output_base_directory=False ): | |
channel = self.get_channel( channel_id ) | |
if output_base_directory == False: | |
output_base_directory = Path.cwd().joinpath( f"{channel_id}-{channel[ 'name' ]}" ) | |
output_base_directory.mkdir( parents=True , exist_ok=True ) | |
attachment_base_directory = output_base_directory.joinpath( "attachments" ) | |
# shutil.rmtree( str( attachment_base_directory ) , ignore_errors=True ) | |
attachment_base_directory.mkdir( parents=True , exist_ok=True ) | |
message_archive_save_path = output_base_directory.joinpath( "messages.json" ) | |
print( f"1.) Downloading Message Archive of {channel[ 'name' ]}" ) | |
messages = self.get_channel_messages( channel_id ) | |
write_json( str( message_archive_save_path ) , messages ) | |
print( f"saved {len(messages)} messages" ) | |
print( f"2.) Downloading Attachments from {channel[ 'name' ]}" ) | |
self.download_all_message_attachments( attachment_base_directory , messages ) | |
def archive_guild( self , guild_id , output_base_directory=False ): | |
if output_base_directory == False: | |
output_base_directory = Path.cwd().joinpath( guild_id ) | |
output_base_directory.mkdir( parents=True , exist_ok=True ) | |
channels = self.get_guild_channels( guild_id ) | |
total_channels = len( channels ) | |
for i , channel in enumerate( channels ): | |
print( f"Processing Channel [ {i+1} ] of {total_channels}" ) | |
if "id" not in channel: | |
continue | |
channel_output_drictory = output_base_directory.joinpath( f"{channel[ 'id' ]}-{channel[ 'name' ]}" ) | |
channel_output_drictory.mkdir( parents=True , exist_ok=True ) | |
self.archive_channel( channel[ "id" ] , channel_output_drictory ) | |
if __name__ == "__main__": | |
bot = DisordChannelArchiverBot({ | |
"token": "asdf" , | |
"write_text_to_image": True , | |
"image_text_font_path": "/Users/morpheous/WORKSPACE/PYTHON/DiscordChannelArchiver/fonts/comic_sans.ttf" , | |
"fallback_font_path": "/Users/morpheous/WORKSPACE/PYTHON/DiscordChannelArchiver/fonts/NotoColorEmoji-Regular.ttf" , | |
"image_text_font_size": 40 , | |
"image_text_paragraph_size": 100 , | |
"image_background_color": "black" , | |
"image_text_fill_color": "white" , | |
"image_text_max_width": 1900 , | |
"image_text_max_height": 800 , | |
"image_text_left_padding": 10 , | |
}) | |
# guilds = bot.get_guilds() | |
# pprint( guilds ) | |
bot.archive_guild( "asdf" ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment