Last active
February 19, 2023 03:34
-
-
Save StellaSmith/95ed88efa5c83dd78dc19ed982ee5ea3 to your computer and use it in GitHub Desktop.
Dump all of the attachments in a discord channel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# discord_dumper.py v1.1.2 Dump all of the attachments in a discord channel | |
# Copyright (C) 2022 Stella Smith | |
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. | |
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. | |
# You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. | |
############## | |
# CHANGELOG: # | |
############## | |
# v1.0.1: use the file name format from history for listening messages | |
# v1.1.0: add -J argument to limit the number of download tasks | |
# v1.1.1: fix download for message listening | |
# v1.1.2: add download counter | |
def positive_int(var, clamp=None) -> int: | |
var = int(var) | |
if var <= 0: | |
raise ValueError("must be a positive integer") | |
return min(var, 512) | |
import argparse | |
import pathlib | |
import functools | |
parser = argparse.ArgumentParser(description="Dump all of the attachments in a discord channel") | |
parser.add_argument("-J", dest="jobs", metavar="jobs", default=512, type=positive_int, help="Maximum number of concurrent downloads, clamped to 512") | |
parser.add_argument("user_token", help="https://cancel.fm/ripcord/static/app_misc/discord_token_howto_en-US.png") | |
parser.add_argument("output_dir", type=pathlib.Path) | |
parser.add_argument("channel_id", type=int) | |
arguments = parser.parse_args() | |
import os | |
import asyncio | |
import sys | |
try: | |
import discord | |
except ModuleNotFoundError: | |
import traceback | |
traceback.print_exc() | |
print(f"seems like discord.py is not installed", file=sys.stderr) | |
print(f"try running the following command to install it", file=sys.stderr) | |
print(f"\tpython -m pip install -U discord.py", file=sys.stderr) | |
raise SystemExit(1) | |
try: | |
import aiohttp | |
except ModuleNotFoundError: | |
import traceback | |
traceback.print_exc() | |
print(f"seems like discord.py is not installed", file=sys.stderr) | |
print(f"try running the following command to install it", file=sys.stderr) | |
print(f"\tpython -m pip install -U aiohttp", file=sys.stderr) | |
raise SystemExit(1) | |
try: | |
import fallocate | |
except ModuleNotFoundError: | |
fallocate = None | |
TOKEN: str = arguments.user_token | |
CHANNEL: int = arguments.channel_id | |
OUTPUT: pathlib.Path = arguments.output_dir | |
SEMAPHORE = asyncio.Semaphore(arguments.jobs) | |
TOTAL_DOWNLOAD = 0 | |
DOWNLOADED = 0 | |
del parser, arguments, argparse | |
async def download(session, url, filename): | |
global TOTAL_DOWNLOAD | |
global DOWNLOADED | |
TOTAL_DOWNLOAD += 1 | |
if (OUTPUT / filename).is_file(): | |
print(f"download for {url} skipped, exists") | |
print(f"{DOWNLOADED}/{TOTAL_DOWNLOAD}", end="\r") | |
DOWNLOADED += 1 | |
return | |
loop = asyncio.get_running_loop() | |
async with SEMAPHORE as semp: | |
print(f"download for {url} started") | |
print(f"{DOWNLOADED}/{TOTAL_DOWNLOAD}", end="\r") | |
try: | |
with open(OUTPUT / filename, "wb") as file: | |
async with session.get(url) as response: | |
if response.content_length is not None and fallocate is not None: | |
fallocate.fallocate(file, 0, response.content_length) | |
async for chunk in response.content.iter_any(): | |
file.write(chunk) | |
except: | |
try: | |
os.unlink(OUTPUT / filename) | |
except Exception: pass | |
raise | |
finally: | |
DOWNLOADED += 1 | |
print(f"download for {url} finished") | |
print(f"{DOWNLOADED}/{TOTAL_DOWNLOAD}", end="\r") | |
client = discord.Client() | |
session = None | |
@client.event | |
async def on_ready(): | |
global session | |
print(f"logged in as {client.user}") | |
channel = client.get_channel(CHANNEL) | |
if channel is None: | |
print(f"cannot find channel with id {CHANNEL}", sys.stderr) | |
raise SystemExit(1) | |
if not isinstance(channel, discord.abc.Messageable): | |
print(f"channel {channel} ({channel.id}) is not a text channel ({type(channel)})", sys.stderr) | |
raise SystemExit(1) | |
os.makedirs(OUTPUT, exist_ok=True) | |
print(f"scanning {channel} history") | |
session = aiohttp.ClientSession() | |
tasks = set() | |
async for message in channel.history(limit=None): | |
message: discord.Message | |
for i, attachment in enumerate(message.attachments): | |
task = asyncio.ensure_future(download(session, attachment.url, f"{message.id}-{i:03} {attachment.filename}")) | |
tasks.add(task) | |
await asyncio.gather(*tasks) | |
print() | |
print("finished scanning history, listening for new messages") | |
print("you can press CTRL+C to stop") | |
print() | |
@client.event | |
async def on_message(message: discord.Message): | |
if message.channel.id != CHANNEL: return | |
tasks = [] | |
for i, attachment in enumerate(message.attachments): | |
task = asyncio.ensure_future(download(session, attachment.url, f"{message.id}-{i:03} {attachment.filename}")) | |
tasks.append(task) | |
await asyncio.gather(*tasks) | |
try: | |
client.run(TOKEN, bot=False) | |
finally: | |
if session is not None: | |
asyncio.run(session.close()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment