Created
August 12, 2021 12:46
-
-
Save cnorthwood/07c9d3b8c99d8a6e87cdcc78954cdf6e to your computer and use it in GitHub Desktop.
Converting Slack export to CSV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from collections import namedtuple | |
import csv | |
from datetime import datetime | |
from itertools import chain | |
import json | |
import os.path | |
import pathlib | |
import re | |
SlackMessage = namedtuple("SlackMessage", "timestamp channel author message") | |
USER_RE = re.compile(r'<@(?P<user_id>\w+)>') | |
CHANNEL_RE = re.compile(r'<#\w+\|(?P<channel_name>\w+)>') | |
def slack_ts(ts): | |
seconds, ms = ts.split(".") | |
return datetime.utcfromtimestamp(int(seconds)).replace(microsecond=int(ms)) | |
def get_user_names(users_filename): | |
with open(users_filename, "r") as users_file: | |
users_json = json.load(users_file) | |
return {user["id"]: user["name"] for user in users_json} | |
def get_dms(dms_filename, users): | |
with open(dms_filename, "r") as dms_file: | |
dms_json = json.load(dms_file) | |
return {dm["id"]: f"DM between {', '.join(users.get(member, member) for member in dm['members'])}" for dm in dms_json} | |
def get_private_groups(mpims_filename, users): | |
with open(mpims_filename, "r") as mpims_file: | |
mpims_json = json.load(mpims_file) | |
return {mpim["name"]: f"DM between {', '.join(users.get(member, member) for member in mpim['members'])}" for mpim in mpims_json} | |
def replace_slack_ids(text, users): | |
text = USER_RE.sub(lambda match: f"@{users.get(match.group('user_id'), match.group('user_id'))}", text) | |
text = CHANNEL_RE.sub(lambda match: f"#{match.group('channel_name')}", text) | |
return text | |
def get_messages(channel_path, users, dms): | |
channel_name = os.path.basename(channel_path) | |
if channel_name in dms: | |
channel_name = dms[channel_name] | |
else: | |
channel_name = f"#{channel_name}" | |
for day_filename in os.listdir(channel_path): | |
with open(os.path.join(channel_path, day_filename), "r") as day_file: | |
day_json = json.load(day_file) | |
for message in day_json: | |
if message.get("hidden", False): | |
continue | |
yield SlackMessage( | |
slack_ts(message["ts"]).isoformat(), | |
channel_name, | |
users.get(message.get("user"), "unknown"), | |
replace_slack_ids(message["text"], users) | |
) | |
def main(root): | |
users = get_user_names(os.path.join(root, "users.json")) | |
dms = dict(**get_dms(os.path.join(root, "dms.json"), users), **get_private_groups(os.path.join(root, "mpims.json"), users)) | |
with os.scandir(root) as entries: | |
messages = list(chain.from_iterable(get_messages(entry.path, users, dms) for entry in entries if entry.is_dir())) | |
messages = sorted(messages, key=lambda m: m.timestamp) | |
with open("messages.csv", "w", newline='') as messages_file: | |
messages_csv = csv.writer(messages_file) | |
messages_csv.writerow(["Timestamp", "Channel", "Author", "Message"]) | |
messages_csv.writerows(messages) | |
main(pathlib.Path(__file__).parent.resolve()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment