Skip to content

Instantly share code, notes, and snippets.

@0187773933
Last active April 19, 2024 19:49
Show Gist options
  • Save 0187773933/d69b95917eee599500ad3cf811c996c3 to your computer and use it in GitHub Desktop.
Save 0187773933/d69b95917eee599500ad3cf811c996c3 to your computer and use it in GitHub Desktop.
Discord Channel Archive Bot
#!/usr/bin/env python3
import requests
from pprint import pprint
from box import Box
from pathlib import Path
import shutil
import json
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import urllib.parse
# OVERWRITE = False
OVERWRITE = True
# https://stackoverflow.com/questions/1970807/center-middle-align-text-with-pil#1970930
from PIL import Image , ImageDraw , ImageFont
import textwrap
def download_file( options ):
try:
if OVERWRITE == False:
if options[ 1 ].is_file() == True:
if options[ 1 ].stat().st_size > 1:
return True
r = requests.get( options[ 0 ] , stream=True )
total_size = int( r.headers.get( "content-length" , 0 ) )
block_size = 1024
t = tqdm( total=total_size , unit="iB" , unit_scale=True )
with open( str( options[ 1 ] ) , "wb" ) as f:
for data in r.iter_content( block_size ):
t.update( len( data ) )
f.write( data )
t.close()
if total_size != 0 and t.n != total_size:
print( "ERROR , something went wrong" )
except Exception as e:
print( e )
def write_json( file_path , python_object ):
with open( file_path , 'w', encoding='utf-8' ) as f:
json.dump( python_object , f , ensure_ascii=False , indent=4 )
def read_json( file_path ):
with open( file_path ) as f:
return json.load( f )
def batch_process( options ):
batch_size = len( options[ "batch_list" ] )
with ThreadPoolExecutor() as executor:
result_pool = list( tqdm( executor.map( options[ "function_reference" ] , iter( options[ "batch_list" ] ) ) , total=batch_size ) )
return result_pool
def supports_char( font , char ):
try:
return char in font.getmask(char).getbbox()
except TypeError:
return False
class DisordChannelArchiverBot:
def __init__( self , config={} ):
self.config = Box( config )
self.headers = headers = {
"accept": "application/json, text/plain, */*" ,
"Authorization": f"Bot {self.config.token}"
}
if self.config.write_text_to_image == True:
self.image_text_font = ImageFont.truetype( self.config.image_text_font_path , self.config.image_text_font_size )
self.fallback_font = ImageFont.truetype( self.config.fallback_font_path , self.config.image_text_font_size )
# self.get_guilds()
def enumerate_request( self , limit=100 , after=False ):
# https://discord.com/developers/docs/resources/channel#get-channel-messages
# https://discord.com/developers/docs/reference#snowflakes
pass
def get_guilds( self ):
# https://discord.com/developers/docs/resources/user#get-current-user-guilds
limit = 200
params = { "limit": limit }
url = f"https://discord.com/api/users/@me/guilds"
response = requests.get( url , headers=self.headers , params=params )
response.raise_for_status()
self.guilds = response.json()
return self.guilds
def get_guild_channels( self , guild_id ):
# https://discord.com/developers/docs/resources/guild#get-guild-channels
params = {}
url = f"https://discord.com/api/guilds/{guild_id}/channels"
response = requests.get( url , headers=self.headers , params=params )
response.raise_for_status()
result = response.json()
return result
def get_channel_messages( self , channel_id ):
# https://discord.com/developers/docs/resources/channel#get-channel-messages
limit = 100
params = { "limit": limit }
url = f"https://discord.com/api/channels/{channel_id}/messages"
response = requests.get( url , headers=self.headers , params=params )
response.raise_for_status()
messages = response.json()
# arrives in reverse order , aka latest message = array[0] , first message = array[-1]
messages.reverse()
pprint( messages )
if len( messages ) < limit:
return messages
finished = False
iterations = 1
while finished == False:
print( f"Gathering {limit} new messages , Round = {iterations} , Total = {len( messages )}" )
params[ "before" ] = messages[ 0 ][ "id" ]
response = requests.get( url , headers=self.headers , params=params )
response.raise_for_status()
new_messages = response.json()
new_messages.reverse()
messages = new_messages + messages
iterations += 1
if len( new_messages ) < limit:
finished = True
# print( len( messages ) )
return messages
def write_text_to_image(self, text, output_path):
# Create a new image with the specified background color
image = Image.new('RGB', (self.config["image_text_max_width"], self.config["image_text_max_height"]), self.config["image_background_color"] )
draw = ImageDraw.Draw(image)
# Split text into paragraphs by explicit line breaks, then wrap each paragraph
paragraphs = text.split('\n')
wrapped_text = []
for paragraph in paragraphs:
wrapped_text.extend(textwrap.wrap(paragraph, width=self.config["image_text_max_width"])) # Adjust 'width' based on your font and desired wrapping
# Calculate font height using a typical character
font_height = draw.textbbox((0, 0), 'Hy', font=self.image_text_font)[3]
# Measure all lines to determine the total height
total_text_height = len(wrapped_text) * (font_height + 10) # Including line spacing
# Calculate starting y position to vertically center the text
current_y = (self.config["image_text_max_height"] - total_text_height) // 2
# Initialize minimal bounding box coordinates for trimming
min_x, min_y, max_x, max_y = self.config["image_text_max_width"], self.config["image_text_max_height"], 0, 0
# Draw each line
for line in wrapped_text:
# Calculate text width and set x to left align with padding
current_x = self.config["image_text_left_padding"]
# Draw the text on the image
draw.text((current_x, current_y), line, font=self.image_text_font, fill=self.config["image_text_fill_color"])
bbox = draw.textbbox((current_x, current_y), line, font=self.image_text_font)
min_x = min(min_x, bbox[0])
min_y = min(min_y, bbox[1])
max_x = max(max_x, bbox[2])
max_y = max(max_y, bbox[3])
current_y += font_height + 10 # Increment y-position for the next line
# Trim the image if necessary
padding = 10
if max_x + padding < self.config["image_text_max_width"] or max_y + padding < self.config["image_text_max_height"]:
image = image.crop((max(0, min_x - padding), max(0, min_y - padding), min(self.config["image_text_max_width"], max_x + padding), min(self.config["image_text_max_height"], max_y + padding)))
# Save the final image
image.save(output_path)
def download_all_message_attachments( self , output_directory , messages ):
download_list = []
total_messages = len( messages )
zfill_number = ( len( str( total_messages ) ) + 1 )
item_total = 1
for message_index , message in enumerate( messages ):
if "attachments" not in message:
continue
if len( message[ "attachments" ] ) < 1:
if "embeds" not in message:
if len( message[ "content" ] ) > 0:
if self.config.write_text_to_image == True:
output_path = output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.png' )
self.write_text_to_image( message[ 'content' ] , output_path )
item_total += 1
continue
if len( message[ "embeds" ] ) < 1:
if len( message[ "content" ] ) > 0:
if self.config.write_text_to_image == True:
output_path = output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.png' )
self.write_text_to_image( message[ 'content' ] , output_path )
item_total += 1
continue
for embed_index , embed in enumerate( message[ "embeds" ] ):
if "thumbnail" not in embed:
continue
if "proxy_url" not in embed[ "thumbnail" ]:
continue
file_type = embed[ "thumbnail" ][ "url" ].split( "." )[ -1 ][ 0 : 3 ]
if file_type == "jpe":
file_type = "jpeg"
download_list.append([
embed[ "thumbnail" ][ "proxy_url" ] ,
output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}.{file_type}' )
])
item_total += 1
else:
for attachment_index , attachment in enumerate( message[ "attachments" ] ):
if "url" not in attachment:
pprint( message )
continue
if "filename" not in attachment:
pprint( message )
continue
download_list.append([
attachment[ "url" ] ,
output_directory.joinpath( f'{str(item_total).zfill(zfill_number)}{Path( attachment[ "filename" ] ).suffix}' )
])
item_total += 1
# pprint( download_list )
batch_process({
"max_workers": 10 ,
"batch_list": download_list ,
"function_reference": download_file
})
def get_channel( self , channel_id ):
params = {}
url = f"https://discord.com/api/channels/{channel_id}"
response = requests.get( url , headers=self.headers , params=params )
response.raise_for_status()
result = response.json()
pprint( result )
return result
def archive_channel( self , channel_id , output_base_directory=False ):
channel = self.get_channel( channel_id )
if output_base_directory == False:
output_base_directory = Path.cwd().joinpath( f"{channel_id}-{channel[ 'name' ]}" )
output_base_directory.mkdir( parents=True , exist_ok=True )
attachment_base_directory = output_base_directory.joinpath( "attachments" )
# shutil.rmtree( str( attachment_base_directory ) , ignore_errors=True )
attachment_base_directory.mkdir( parents=True , exist_ok=True )
message_archive_save_path = output_base_directory.joinpath( "messages.json" )
print( f"1.) Downloading Message Archive of {channel[ 'name' ]}" )
messages = self.get_channel_messages( channel_id )
write_json( str( message_archive_save_path ) , messages )
print( f"saved {len(messages)} messages" )
print( f"2.) Downloading Attachments from {channel[ 'name' ]}" )
self.download_all_message_attachments( attachment_base_directory , messages )
def archive_guild( self , guild_id , output_base_directory=False ):
if output_base_directory == False:
output_base_directory = Path.cwd().joinpath( guild_id )
output_base_directory.mkdir( parents=True , exist_ok=True )
channels = self.get_guild_channels( guild_id )
total_channels = len( channels )
for i , channel in enumerate( channels ):
print( f"Processing Channel [ {i+1} ] of {total_channels}" )
if "id" not in channel:
continue
channel_output_drictory = output_base_directory.joinpath( f"{channel[ 'id' ]}-{channel[ 'name' ]}" )
channel_output_drictory.mkdir( parents=True , exist_ok=True )
self.archive_channel( channel[ "id" ] , channel_output_drictory )
if __name__ == "__main__":
bot = DisordChannelArchiverBot({
"token": "asdf" ,
"write_text_to_image": True ,
"image_text_font_path": "/Users/morpheous/WORKSPACE/PYTHON/DiscordChannelArchiver/fonts/comic_sans.ttf" ,
"fallback_font_path": "/Users/morpheous/WORKSPACE/PYTHON/DiscordChannelArchiver/fonts/NotoColorEmoji-Regular.ttf" ,
"image_text_font_size": 40 ,
"image_text_paragraph_size": 100 ,
"image_background_color": "black" ,
"image_text_fill_color": "white" ,
"image_text_max_width": 1900 ,
"image_text_max_height": 800 ,
"image_text_left_padding": 10 ,
})
# guilds = bot.get_guilds()
# pprint( guilds )
bot.archive_guild( "asdf" )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment