Skip to content

Instantly share code, notes, and snippets.

@StellaSmith
Last active February 19, 2023 03:34
Show Gist options
  • Save StellaSmith/95ed88efa5c83dd78dc19ed982ee5ea3 to your computer and use it in GitHub Desktop.
Save StellaSmith/95ed88efa5c83dd78dc19ed982ee5ea3 to your computer and use it in GitHub Desktop.
Dump all of the attachments in a discord channel
# discord_dumper.py v1.1.2 Dump all of the attachments in a discord channel
# Copyright (C) 2022 Stella Smith
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
##############
# CHANGELOG: #
##############
# v1.0.1: use the file name format from history for listening messages
# v1.1.0: add -J argument to limit the number of download tasks
# v1.1.1: fix download for message listening
# v1.1.2: add download counter
def positive_int(var, clamp=None) -> int:
var = int(var)
if var <= 0:
raise ValueError("must be a positive integer")
return min(var, 512)
import argparse
import pathlib
import functools
parser = argparse.ArgumentParser(description="Dump all of the attachments in a discord channel")
parser.add_argument("-J", dest="jobs", metavar="jobs", default=512, type=positive_int, help="Maximum number of concurrent downloads, clamped to 512")
parser.add_argument("user_token", help="https://cancel.fm/ripcord/static/app_misc/discord_token_howto_en-US.png")
parser.add_argument("output_dir", type=pathlib.Path)
parser.add_argument("channel_id", type=int)
arguments = parser.parse_args()
import os
import asyncio
import sys
try:
import discord
except ModuleNotFoundError:
import traceback
traceback.print_exc()
print(f"seems like discord.py is not installed", file=sys.stderr)
print(f"try running the following command to install it", file=sys.stderr)
print(f"\tpython -m pip install -U discord.py", file=sys.stderr)
raise SystemExit(1)
try:
import aiohttp
except ModuleNotFoundError:
import traceback
traceback.print_exc()
print(f"seems like discord.py is not installed", file=sys.stderr)
print(f"try running the following command to install it", file=sys.stderr)
print(f"\tpython -m pip install -U aiohttp", file=sys.stderr)
raise SystemExit(1)
try:
import fallocate
except ModuleNotFoundError:
fallocate = None
TOKEN: str = arguments.user_token
CHANNEL: int = arguments.channel_id
OUTPUT: pathlib.Path = arguments.output_dir
SEMAPHORE = asyncio.Semaphore(arguments.jobs)
TOTAL_DOWNLOAD = 0
DOWNLOADED = 0
del parser, arguments, argparse
async def download(session, url, filename):
global TOTAL_DOWNLOAD
global DOWNLOADED
TOTAL_DOWNLOAD += 1
if (OUTPUT / filename).is_file():
print(f"download for {url} skipped, exists")
print(f"{DOWNLOADED}/{TOTAL_DOWNLOAD}", end="\r")
DOWNLOADED += 1
return
loop = asyncio.get_running_loop()
async with SEMAPHORE as semp:
print(f"download for {url} started")
print(f"{DOWNLOADED}/{TOTAL_DOWNLOAD}", end="\r")
try:
with open(OUTPUT / filename, "wb") as file:
async with session.get(url) as response:
if response.content_length is not None and fallocate is not None:
fallocate.fallocate(file, 0, response.content_length)
async for chunk in response.content.iter_any():
file.write(chunk)
except:
try:
os.unlink(OUTPUT / filename)
except Exception: pass
raise
finally:
DOWNLOADED += 1
print(f"download for {url} finished")
print(f"{DOWNLOADED}/{TOTAL_DOWNLOAD}", end="\r")
client = discord.Client()
session = None
@client.event
async def on_ready():
global session
print(f"logged in as {client.user}")
channel = client.get_channel(CHANNEL)
if channel is None:
print(f"cannot find channel with id {CHANNEL}", sys.stderr)
raise SystemExit(1)
if not isinstance(channel, discord.abc.Messageable):
print(f"channel {channel} ({channel.id}) is not a text channel ({type(channel)})", sys.stderr)
raise SystemExit(1)
os.makedirs(OUTPUT, exist_ok=True)
print(f"scanning {channel} history")
session = aiohttp.ClientSession()
tasks = set()
async for message in channel.history(limit=None):
message: discord.Message
for i, attachment in enumerate(message.attachments):
task = asyncio.ensure_future(download(session, attachment.url, f"{message.id}-{i:03} {attachment.filename}"))
tasks.add(task)
await asyncio.gather(*tasks)
print()
print("finished scanning history, listening for new messages")
print("you can press CTRL+C to stop")
print()
@client.event
async def on_message(message: discord.Message):
if message.channel.id != CHANNEL: return
tasks = []
for i, attachment in enumerate(message.attachments):
task = asyncio.ensure_future(download(session, attachment.url, f"{message.id}-{i:03} {attachment.filename}"))
tasks.append(task)
await asyncio.gather(*tasks)
try:
client.run(TOKEN, bot=False)
finally:
if session is not None:
asyncio.run(session.close())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment