Skip to content

Instantly share code, notes, and snippets.

@typemytype
Last active January 26, 2023 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save typemytype/d056992b1a68b84f6abc3b458d77bad1 to your computer and use it in GitHub Desktop.
Save typemytype/d056992b1a68b84f6abc3b458d77bad1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
based on
https://github.com/andrejbauer/slack-to-discord
!slackpreview
!slackimport
"""
import discord
from discord.ext import commands
import datetime
import json
import argparse
import sys
import re
import os
import urllib.request
arg_parser = argparse.ArgumentParser(description="A Discord bot for transferring a Slack archive to a channel")
arg_parser.add_argument('--prefix', dest='prefix', default='!', help='bot command prefix (default !)')
arg_parser.add_argument('--token', required=True, help='bot access token')
arg_parser.add_argument('--users', required=True, type=argparse.FileType('r'), help='slack users.json file')
arg_parser.add_argument('file', nargs='+')
# Parse the command-line
args = arg_parser.parse_args(sys.argv[1:])
# Load user data
users = {}
for u in json.load(args.users):
if not u['deleted']:
users[u['id']] = u['real_name']
# Process the input files
messagesMap = dict()
messages = []
for fn in args.file:
downloadFolder = os.path.join(os.path.dirname(fn), "_downloads")
if not os.path.exists(downloadFolder):
os.mkdir(downloadFolder)
with open(fn, "rb") as fh:
for msg in json.load(fh):
# Unfold mentions
txt = re.sub(r'<@(\w+)>',
(lambda m: '@' + users.get(m.group(1), 'Unknown')),
msg["text"])
# Unescape HTML characters
txt = re.sub(r'&gt;', '>', txt)
txt = re.sub(r'&lt;', '<', txt)
txt = re.sub(r'&amp;', '&', txt)
# ignore
if txt.endswith(" has joined the channel"):
continue
if txt.endswith(" has left the channel"):
continue
if txt.startswith("added an integration to this channel: "):
continue
if txt.startswith("set the channel description: "):
continue
if txt == "This content can't be displayed.":
# its a release
txt = ""
for block in msg.get("blocks", []):
content = block.get("text", {})
txt += content.get("text", "")
txt = re.sub(r"\\\/", "/", txt)
txt = re.sub(r"<http:", "http:", txt)
txt = re.sub(r"\.dmg\| ([0-9]+)>", ".dmg ", txt)
# extract image, files, not docs
if "files" in msg:
msg["localFiles"] = []
for file in msg["files"]:
if file["mode"] != "docs" and "url_private_download" in file:
downloadPath = os.path.join(downloadFolder, f"{file['id']}_{file['name']}")
if not os.path.exists(downloadPath):
urllib.request.urlretrieve(file["url_private_download"], downloadPath)
msg["localFiles"].append(dict(path=downloadPath, filename=file['name']))
messagesMap[msg["ts"]] = msg
# Split messages longer than 2000 characters
while len(txt) > 0:
msg["text"] = txt[:2000]
txt = txt[2000:]
messages.append(msg)
# Sort the messages by timestamp
messages.sort(key=(lambda msg: msg['ts']))
print("Read {0} messages.".format(len(messages)))
# Create the bot
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix=args.prefix, intents=intents)
print("Activating the bot. Press Ctrl-C to exit.")
def getFiles(msg):
files = None
if "localFiles" in msg:
files = []
for file in msg["localFiles"]:
files.append(discord.File(file["path"], filename=file["filename"]))
return files
def format_message(msg):
"""Format the given message in Markdown, suitable for posting on Discord."""
return "{timestamp} **{user}**: {text}".format(
timestamp = datetime.datetime.fromtimestamp(float(msg['ts'])).strftime('%Y-%m-%d %H:%M'),
user=users.get(msg.get('user'), 'Unknown'),
text=msg['text'])
# Set up the bot listener
@bot.command(pass_context=True)
async def slackimport(ctx):
n = len(messages)
k = 0
print ("Sending {0} messages ...".format(n))
for msg in messages:
k = k + 1
if k % 10 == 0:
print("{0}/{1} messages sent ...".format(k, n))
try:
if "replies" in msg:
threadName = msg['text']
if len(threadName) > 100:
threadName = threadName[:96] + "..."
# thread = await ctx.channel.create_thread(name=threadName, type=discord.ChannelType.public_thread)
# await thread.send(format_message(msg), files=getFiles(msg))
message = await ctx.send(format_message(msg), files=getFiles(msg))
thread = await message.create_thread(name=threadName)
# its a thread
for reply in msg["replies"]:
replyMessage = messagesMap.get(reply["ts"])
if replyMessage:
await thread.send(format_message(replyMessage), files=getFiles(replyMessage))
elif "thread_ts" not in msg:
# Send the message to Discord (Markdown format)
await ctx.send(format_message(msg), files=getFiles(msg))
except Exception as e:
print(f"Message {k} could not be sent: {e}")
print("Finished sending messages. My job is done, kill me now.")
@bot.command(pass_context=True)
async def slackpreview(ctx):
for msg in messages:
print("-" * 50)
print(format_message(msg))
for file in msg.get("localFiles", []):
print(f" {file['filename']} ({file['path']})")
@bot.command(pass_context=True)
async def slackexit(ctx):
print("Logging out ...")
await bot.logout()
print("Stopping (do not worry about the error messages printed below) ...")
exit(0)
# Run the bot
bot.run(args.token)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment