Last active April 19, 2024 19:49
Discord Channel Archive Bot
#!/usr/bin/env python3
import requests
from pprint import pprint
from box import Box
from pathlib import Path
import shutil
import json
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import urllib.parse
from PIL import Image , ImageDraw , ImageFont
import textwrap
def download_file( options ):
if OVERWRITE == False:
if options[ 1 ].is_file() == True:
if options[ 1 ].stat().st_size > 1:
return True
r = requests.get( options[ 0 ] , stream=True )
total_size = int( r.headers.get( "content-length" , 0 ) )
block_size = 1024
t = tqdm( total=total_size , unit="iB" , unit_scale=True )
with open( str( options[ 1 ] ) , "wb" ) as f:
for data in r.iter_content( block_size ):
t.update( len( data ) )
f.write( data )
if total_size != 0 and t.n != total_size:
print( "ERROR , something went wrong" )
except Exception as e:
print( e )
def write_json( file_path , python_object ):
with open( file_path , 'w', encoding='utf-8' ) as f:
json.dump( python_object , f , ensure_ascii=False , indent=4 )
def read_json( file_path ):
with open( file_path ) as f:
return json.load( f )
def batch_process( options ):
batch_size = len( options[ "batch_list" ] )
with ThreadPoolExecutor() as executor:
result_pool = list( tqdm( options[ "function_reference" ] , iter( options[ "batch_list" ] ) ) , total=batch_size ) )
return result_pool
def supports_char( font , char ):
return char in font.getmask(char).getbbox()
except TypeError:
return False
class DisordChannelArchiverBot:
def __init__( self , config={} ):
self.config = Box( config )
self.headers = headers = {
"accept": "application/json, text/plain, */*" ,
"Authorization": f"Bot {self.config.token}"
if self.config.write_text_to_image == True:
self.image_text_font = ImageFont.truetype( self.config.image_text_font_path , self.config.image_text_font_size )
self.fallback_font = ImageFont.truetype( self.config.fallback_font_path , self.config.image_text_font_size )
# self.get_guilds()
def enumerate_request( self , limit=100 , after=False ):
def get_guilds( self ):
limit = 200
params = { "limit": limit }
url = f""
response = requests.get( url , headers=self.headers , params=params )
self.guilds = response.json()
return self.guilds
def get_guild_channels( self , guild_id ):
params = {}
url = f"{guild_id}/channels"
response = requests.get( url , headers=self.headers , params=params )
result = response.json()
return result
def get_channel_messages( self , channel_id ):
limit = 100
params = { "limit": limit }
url = f"{channel_id}/messages"
response = requests.get( url , headers=self.headers , params=params )
messages = response.json()
# arrives in reverse order , aka latest message = array[0] , first message = array[-1]
pprint( messages )
if len( messages ) < limit:
return messages
finished = False
iterations = 1
while finished == False:
print( f"Gathering {limit} new messages , Round = {iterations} , Total = {len( messages )}" )
params[ "before" ] = messages[ 0 ][ "id" ]
response = requests.get( url , headers=self.headers , params=params )
new_messages = response.json()
messages = new_messages + messages
iterations += 1
if len( new_messages ) < limit:
finished = True
# print( len( messages ) )
return messages
def write_text_to_image(self, text, output_path):
# Create a new image with the specified background color
image ='RGB', (self.config["image_text_max_width"], self.config["image_text_max_height"]), self.config["image_background_color"] )
draw = ImageDraw.Draw(image)
# Split text into paragraphs by explicit line breaks, then wrap each paragraph
paragraphs = text.split('\n')
wrapped_text = []
for paragraph in paragraphs:
wrapped_text.extend(textwrap.wrap(paragraph, width=self.config["image_text_max_width"])) # Adjust 'width' based on your font and desired wrapping
# Calculate font height using a typical character
font_height = draw.textbbox((0, 0), 'Hy', font=self.image_text_font)[3]
# Measure all lines to determine the total height
total_text_height = len(wrapped_text) * (font_height + 10) # Including line spacing
# Calculate starting y position to vertically center the text
current_y = (self.config["image_text_max_height"] - total_text_height) // 2
# Initialize minimal bounding box coordinates for trimming
min_x, min_y, max_x, max_y = self.config["image_text_max_width"], self.config["image_text_max_height"], 0, 0
# Draw each line
for line in wrapped_text:
# Calculate text width and set x to left align with padding
current_x = self.config["image_text_left_padding"]
# Draw the text on the image
draw.text((current_x, current_y), line, font=self.image_text_font, fill=self.config["image_text_fill_color"])
bbox = draw.textbbox((current_x, current_y), line, font=self.image_text_font)
min_x = min(min_x, bbox[0])
min_y = min(min_y, bbox[1])
max_x = max(max_x, bbox[2])
max_y = max(max_y, bbox[3])
current_y += font_height + 10 # Increment y-position for the next line
# Trim the image if necessary
padding = 10
if max_x + padding < self.config["image_text_max_width"] or max_y + padding < self.config["image_text_max_height"]:
image = image.crop((max(0, min_x - padding), max(0, min_y - padding), min(self.config["image_text_max_width"], max_x + padding), min(self.config["image_text_max_height"], max_y + padding)))
# Save the final image
def download_all_message_attachments( self , output_directory , messages ):
download_list = []
total_messages = len( messages )
zfill_number = ( len( str( total_messages ) ) + 1 )
item_total = 1
for message_index , message in enumerate( messages ):
if "attachments" not in message:
if len( message[ "attachments" ] ) < 1:
if "embeds" not in message:
if len( message[ "content" ] ) > 0:
if self.config.write_text_to_image == True:
output_path = output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.png' )
self.write_text_to_image( message[ 'content' ] , output_path )
item_total += 1
if len( message[ "embeds" ] ) < 1:
if len( message[ "content" ] ) > 0:
if self.config.write_text_to_image == True:
output_path = output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.png' )
self.write_text_to_image( message[ 'content' ] , output_path )
item_total += 1
for embed_index , embed in enumerate( message[ "embeds" ] ):
if "thumbnail" not in embed:
if "proxy_url" not in embed[ "thumbnail" ]:
file_type = embed[ "thumbnail" ][ "url" ].split( "." )[ -1 ][ 0 : 3 ]
if file_type == "jpe":
file_type = "jpeg"
embed[ "thumbnail" ][ "proxy_url" ] ,
output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.{file_type}' )
item_total += 1
for attachment_index , attachment in enumerate( message[ "attachments" ] ):
if "url" not in attachment:
pprint( message )
if "filename" not in attachment:
pprint( message )
attachment[ "url" ] ,
output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}{Path( attachment[ "filename" ] ).suffix}' )
item_total += 1
# pprint( download_list )
"max_workers": 10 ,
"batch_list": download_list ,
"function_reference": download_file
def get_channel( self , channel_id ):
params = {}
url = f"{channel_id}"
response = requests.get( url , headers=self.headers , params=params )
result = response.json()
pprint( result )
return result
def archive_channel( self , channel_id , output_base_directory=False ):
channel = self.get_channel( channel_id )
if output_base_directory == False:
output_base_directory = Path.cwd().joinpath( f"{channel_id}-{channel[ 'name' ]}" )
output_base_directory.mkdir( parents=True , exist_ok=True )
attachment_base_directory = output_base_directory.joinpath( "attachments" )
# shutil.rmtree( str( attachment_base_directory ) , ignore_errors=True )
attachment_base_directory.mkdir( parents=True , exist_ok=True )
message_archive_save_path = output_base_directory.joinpath( "messages.json" )
print( f"1.) Downloading Message Archive of {channel[ 'name' ]}" )
messages = self.get_channel_messages( channel_id )
write_json( str( message_archive_save_path ) , messages )
print( f"saved {len(messages)} messages" )
print( f"2.) Downloading Attachments from {channel[ 'name' ]}" )
self.download_all_message_attachments( attachment_base_directory , messages )
def archive_guild( self , guild_id , output_base_directory=False ):
if output_base_directory == False:
output_base_directory = Path.cwd().joinpath( guild_id )
output_base_directory.mkdir( parents=True , exist_ok=True )
channels = self.get_guild_channels( guild_id )
total_channels = len( channels )
for i , channel in enumerate( channels ):
print( f"Processing Channel [ {i+1} ] of {total_channels}" )
if "id" not in channel:
channel_output_drictory = output_base_directory.joinpath( f"{channel[ 'id' ]}-{channel[ 'name' ]}" )
channel_output_drictory.mkdir( parents=True , exist_ok=True )
self.archive_channel( channel[ "id" ] , channel_output_drictory )
if __name__ == "__main__":
bot = DisordChannelArchiverBot({
"token": "asdf" ,
"write_text_to_image": True ,
"image_text_font_path": "/Users/morpheous/WORKSPACE/PYTHON/DiscordChannelArchiver/fonts/comic_sans.ttf" ,
"fallback_font_path": "/Users/morpheous/WORKSPACE/PYTHON/DiscordChannelArchiver/fonts/NotoColorEmoji-Regular.ttf" ,
"image_text_font_size": 40 ,
"image_text_paragraph_size": 100 ,
"image_background_color": "black" ,
"image_text_fill_color": "white" ,
"image_text_max_width": 1900 ,
"image_text_max_height": 800 ,
"image_text_left_padding": 10 ,
# guilds = bot.get_guilds()
# pprint( guilds )
bot.archive_guild( "asdf" )
