Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@RobertKrajewski
Last active April 13, 2024 10:46
Show Gist options
  • Star 38 You must be signed in to star a gist
  • Fork 21 You must be signed in to fork a gist
  • Save RobertKrajewski/5847ce49333062ea4be1a08f2913288c to your computer and use it in GitHub Desktop.
Save RobertKrajewski/5847ce49333062ea4be1a08f2913288c to your computer and use it in GitHub Desktop.
This script allows to export the content (text+files) of an interactively selected Mattermost channel (public, private, group, direct message) to files. Tested on Mattermost 5.27 using Python 3.7
import os
import sqlite3
from datetime import datetime, date
from typing import Tuple, Dict, List
import getpass
from mattermostdriver import Driver
import pathlib
import json
def connect(host: str, login_token: str = None, username: str = None, password: str = None) -> Driver:
d = Driver({
"url": host,
"port": 443,
"token": login_token,
"username": username,
"password": password
})
d.login()
return d
def get_users(d: Driver) -> Tuple[Dict[str, str], str]:
my_user = d.users.get_user("me")
my_username = my_user["username"]
my_user_id = my_user["id"]
print(f"Successfully logged in as {my_username} ({my_user_id})")
# Get all usernames as we want to use those instead of the user ids
user_id_to_name = {}
page = 0
print("Downloading all user information... ", end="")
while True:
users_resp = d.users.get_users(params={"per_page": 200, "page": page})
if len(users_resp) == 0:
break
for user in users_resp:
user_id_to_name[user["id"]] = user["username"]
page += 1
print(f"Found {len(user_id_to_name)} users!")
return user_id_to_name, my_user_id
def select_team(d: Driver, my_user_id: str) -> str:
print("Downloading all team information... ", end="")
teams = d.teams.get_user_teams(my_user_id)
print(f"Found {len(teams)} teams!")
for i_team, team in enumerate(teams):
print(f"{i_team}\t{team['name']}\t({team['id']})")
team_idx = int(input("Select team by idx: "))
team = teams[team_idx]
print(f"Selected team {team['name']}")
return team
def select_channel(d: Driver, team: str, my_user_id: str, user_id_to_name: Dict[str, str],
verbose: bool = False) -> List[str]:
print("Downloading all channel information... ", end="")
channels = d.channels.get_channels_for_user(my_user_id, team["id"])
# Add display name to direct messages
for channel in channels:
if channel["type"] != "D":
continue
# The channel name consists of two user ids connected by a double underscore
user_ids = channel["name"].split("__")
other_user_id = user_ids[1] if user_ids[0] == my_user_id else user_ids[0]
channel["display_name"] = user_id_to_name[other_user_id]
# Sort channels by name for easier search
channels = sorted(channels, key=lambda x: x["display_name"].lower())
print(f"Found {len(channels)} channels!")
for i_channel, channel in enumerate(channels):
if verbose:
channel_id = f"\t({channel['id']})"
else:
channel_id = ""
print(f"{i_channel}\t{channel['display_name']}{channel_id}")
channel_input = input("Select channels by idx separated by comma or type 'all' for downloading all channels: ")
if channel_input == "all":
channel_idxs = list(range(len(channels)))
else:
channel_idxs = channel_input.replace(" ", "").split(",")
selected_channels = [channels[int(idx)] for idx in channel_idxs]
print("Selected channel(s):", ", ".join([channel["display_name"] for channel in selected_channels]))
return selected_channels
def export_channel(d: Driver, channel: str, user_id_to_name: Dict[str, str], output_base: str,
download_files: bool = True, before: str = None, after: str = None):
# Sanitize channel name
channel_name = channel["display_name"].replace("\\", "").replace("/", "")
print("Exporting channel", channel_name)
if after:
after = datetime.strptime(after, '%Y-%m-%d').timestamp()
if before:
before = datetime.strptime(before, '%Y-%m-%d').timestamp()
# Get all posts for selected channel
page = 0
all_posts = []
while True:
print(f"Requesting channel page {page}")
posts = d.posts.get_posts_for_channel(channel["id"], params={"per_page": 200, "page": page})
if len(posts["posts"]) == 0:
# If no posts are returned, we have reached the end
break
all_posts.extend([posts["posts"][post] for post in posts["order"]])
page += 1
print(f"Found {len(all_posts)} posts")
# Create output directory
output_base = pathlib.Path(output_base) / channel_name
output_base.mkdir(parents=True, exist_ok=True)
# Simplify all posts to contain only username, date, message and files in chronological order
simple_posts = []
for i_post, post in enumerate(reversed(all_posts)):
# Filter posts by date range
created = post["create_at"] / 1000
if (before and created > before) or (after and created < after):
continue
user_id = post["user_id"]
if user_id not in user_id_to_name:
user_id_to_name[user_id] = d.users.get_user(user_id)["username"]
username = user_id_to_name[user_id]
created = datetime.utcfromtimestamp(post["create_at"] / 1000).strftime('%Y-%m-%dT%H:%M:%SZ')
message = post["message"]
simple_post = dict(idx=i_post, id=post["id"], created=created, username=username, message=message)
# If a code block is given in the message, dump it to file
if message.count("```") > 1:
start_pos = message.find("```") + 3
end_pos = message.rfind("```")
cut = message[start_pos:end_pos]
if not len(cut):
print("Code has no length")
else:
filename = "%03d" % i_post + "_code.txt"
with open(output_base / filename, "wb") as f:
f.write(cut.encode())
# If any files are attached to the message, download each
if "files" in post["metadata"]:
filenames = []
for file in post["metadata"]["files"]:
if download_files:
filename = "%03d" % i_post + "_" + file["name"]
print("Downloading", file["name"])
while True:
try:
resp = d.files.get_file(file["id"])
break
except:
print("Downloading file failed")
# Mattermost Driver unfortunately parses json files to dicts
if isinstance(resp, dict):
with open(output_base / filename, "w") as f:
json.dump(resp, f)
else:
with open(output_base / filename, "wb") as f:
f.write(resp.content)
filenames.append(file["name"])
simple_post["files"] = filenames
simple_posts.append(simple_post)
output = {
"channel": {
"name": channel["name"],
"display_name": channel["display_name"],
"header": channel["header"],
"id": channel["id"],
"team": d.teams.get_team(channel["team_id"])["name"],
"team_id": channel["team_id"],
"exported_at": datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
},
"posts": simple_posts
}
# Export posts to json file
filtered_channel_name = ''.join(filter(lambda ch: ch not in "?!/\\.;:*\"<>|", channel_name))
output_filename = filtered_channel_name + ".json"
output_filepath = output_base / output_filename
with open(output_filepath, "w", encoding='utf8') as f:
json.dump(output, f, indent=2, ensure_ascii=False)
print(f"Exported channel data to '{output_filepath}'")
def get_config_from_json(config_filename: str = "config.json") -> dict:
config_path = pathlib.Path(config_filename)
if not config_path.exists():
return {}
with config_path.open() as f:
config = json.load(f)
return config
def complete_config(config: dict, config_filename: str = "config.json") -> dict:
config_changed = False
if config.get("host", False):
print(f"Using host '{config['host']}' from config")
else:
config["host"] = input("Please input host/server address (without https://): ")
config_changed = True
if config.get("login_mode", False):
print(f"Using login mode '{config['login_mode']}' from config")
else:
login_mode = ""
while login_mode not in ["password", "token"]:
login_mode = input("Please input login_mode 'password' or 'token' (=Gitlab Oauth): ")
config["login_mode"] = login_mode
config_changed = True
password = None
if config["login_mode"] == "password":
if config.get("username", False):
print(f"Using username '{config['username']}' from config")
else:
config["username"] = input("Please input your username: ")
config_changed = True
password = getpass.getpass("Enter your password (hidden): ")
else:
if config.get("token", False):
print(f"Using token '{config['token']}' from config")
else:
print("Are you logged-in into Mattermost using the Firefox Browser? "
"If so, token may be automatically extracted")
dec = ""
while not (dec == "y" or dec == "n"):
dec = input("Try to find token automatically? y/n: ")
token = None
if dec == "y":
token = find_mmauthtoken_firefox(config["host"])
elif not token:
token = input("Please input your login token (MMAUTHTOKEN): ")
config["token"] = token
config_changed = True
if "download_files" in config:
print(f"Download files set to '{config['download_files']}' from config")
else:
dec = ""
while not (dec == "y" or dec == "n"):
dec = input("Should files be downloaded? y/n: ")
config["download_files"] = dec == "y"
config_changed = True
if config_changed:
dec = ""
while not (dec == "y" or dec == "n"):
dec = input("Config changed! Would you like to store your config (without password) to file? y/n: ")
if dec == "y":
with open(config_filename, "w") as f:
json.dump(config, f, indent=2)
print(f"Stored new config to {config_filename}")
config["password"] = password
return config
def find_mmauthtoken_firefox(host):
appdata_dir = pathlib.Path(os.environ["APPDATA"])
profiles_dir = appdata_dir / "Mozilla/Firefox/Profiles"
cookie_files = profiles_dir.rglob("cookies.sqlite")
all_tokens = []
for cookie_file in cookie_files:
print(f"Opening {cookie_file}")
connection = sqlite3.connect(str(cookie_file))
cursor = connection.cursor()
rows = cursor.execute("SELECT host, value FROM moz_cookies WHERE name = 'MMAUTHTOKEN'").fetchall()
all_tokens.extend(rows)
all_tokens = [token for token in all_tokens if host in token[0]]
print(f"Found {len(all_tokens)} token for {host}")
for token in all_tokens:
print(f"{token[0]}: {token[1]}")
if len(all_tokens) > 1:
print("Using first token!")
if len(all_tokens):
return all_tokens[0][1]
else:
return None
if __name__ == '__main__':
config = get_config_from_json()
config = complete_config(config)
output_base = "results/" + date.today().strftime("%Y%m%d")
print(f"Storing downloaded data in {output_base}")
# Range of posts to be exported as string in format "YYYY-MM-DD". Use None if no filter should be applied
after = config.get("after", None)
before = config.get("before", None)
d = connect(config["host"], config.get("token", None),
config.get("username", None), config.get("password", None))
user_id_to_name, my_user_id = get_users(d)
team = select_team(d, my_user_id)
channels = select_channel(d, team, my_user_id, user_id_to_name)
for i_channel, channel in enumerate(channels):
print(f"Start export of channel {i_channel + 1}/{len(channels)}")
export_channel(d, channel, user_id_to_name, output_base, config["download_files"],
before, after)
print("Finished export")
mattermostdriver
@tomsbaltzer
Copy link

Hi Robert - this is awesome! Do you have a recommended tool for working with the downloaded json and files?
Thanks!
Tom.

@iapomi
Copy link

iapomi commented Apr 30, 2021

Hi Robert, thanks for this cool tool! Do you have any idea how to make it work without gitlab?

@RobertKrajewski
Copy link
Author

Hi Robert - this is awesome! Do you have a recommended tool for working with the downloaded json and files?
Thanks!
Tom.

I use use VS Code to navigate through the json.

Hi Robert, thanks for this cool tool! Do you have any idea how to make it work without gitlab?

Instead of the token in line 12, you have to use the password key as shown here: https://pypi.org/project/mattermostdriver/

@Mar-Koeh
Copy link

Mar-Koeh commented Jun 9, 2022

In the method select_channel there is the linechannel["display_name"] = user_id_to_name[other_user_id]. This fails if there is a direct conversation with a user that does not exist anymore. An ugly hack around that is importing exceptions from mattermostdriver and doing this instead:

            try:
                user_id_to_name[other_user_id] = d.users.get_user(other_user_id)["username"]
            except exceptions.ResourceNotFound:
                user_id_to_name[other_user_id] = other_user_id

For the same reason, we will also have to determine the team name differently when generating output in export_channel

    try:
        team = d.teams.get_team(channel["team_id"])["name"]
    except exceptions.ResourceNotFound:
        team = channel["team_id"]

            "team": team,

@Mar-Koeh
Copy link

Mar-Koeh commented Sep 2, 2022

The comment in line 162 is a lie. If the json file encodes a list instead of an object, it will be converted to list not dict. Hence, there should be (dict, list) instead of list in the next line.

                    if isinstance(resp, (dict, list)):

(otherwise the script crashed when trying to download json-files with a list as the outer element)

@GuillermoAndrade
Copy link

hello
when channel is not related to a team, there is exception in this line 180
fixed with a additional test :

    if channel["team_id"] == "":
        team_name = ""
    else:
        team_name = d.teams.get_team(channel["team_id"])["name"]
    output = {
        "channel": {
            "name": channel["name"],
            "display_name": channel["display_name"],
            "header": channel["header"],
            "id": channel["id"],
            "team": team_name,
            "team_id": channel["team_id"],
            "exported_at": datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
        },
        "posts": simple_posts
    }

@scholz
Copy link

scholz commented Jul 5, 2023

Thank you for providing this! Worked fine for me incl. using small fix from @GuillermoAndrade !

@alkemyst
Copy link

Can I use this in a project of mine, providing the link and recognition?

@RobertKrajewski
Copy link
Author

RobertKrajewski commented Aug 22, 2023 via email

@alkemyst
Copy link

Thanks a lot!

@thomas-mc-work
Copy link

Did someone manage to use this script with Server version 8? (8.1.1 to be specific)
I always get a 403 response, regardless of whether I use token or password login.

@commonism
Copy link

8.1.2 - works with minor changes

basically https://gist.github.com/RobertKrajewski/5847ce49333062ea4be1a08f2913288c?permalink_comment_id=4542090#gistcomment-4542090 & https://gist.github.com/RobertKrajewski/5847ce49333062ea4be1a08f2913288c?permalink_comment_id=4194956#gistcomment-4194956

I addressed the team_id by adding it manually - as the channel is returned by querying a team.

diff --git a/mattermost-dl.py b/mattermost-dl.py
index ac03bb7..f45a408 100644
--- a/mattermost-dl.py
+++ b/mattermost-dl.py
@@ -4,7 +4,7 @@ from datetime import datetime, date
 from typing import Tuple, Dict, List
 import getpass
 
-from mattermostdriver import Driver
+from mattermostdriver import Driver, exceptions
 import pathlib
 import json
 
@@ -14,8 +14,9 @@ def connect(host: str, login_token: str = None, username: str = None, password:
         "url": host,
         "port": 443,
         "token": login_token,
-        "username": username,
-        "password": password
+        "login_id": username,
+        "password": password,
+#        "debug":True
     })
     d.login()
     return d
@@ -61,6 +62,7 @@ def select_channel(d: Driver, team: str, my_user_id: str, user_id_to_name: Dict[
     channels = d.channels.get_channels_for_user(my_user_id, team["id"])
     # Add display name to direct messages
     for channel in channels:
+        channel["team_id"] = team["id"]
         if channel["type"] != "D":
             continue
 
@@ -127,7 +129,11 @@ def export_channel(d: Driver, channel: str, user_id_to_name: Dict[str, str], out
 
         user_id = post["user_id"]
         if user_id not in user_id_to_name:
-            user_id_to_name[user_id] = d.users.get_user(user_id)["username"]
+            try:
+                user_id_to_name[user_id] = d.users.get_user(user_id)["username"]
+            except exceptions.ResourceNotFound:
+                user_id_to_name[user_id] = user_id
+
         username = user_id_to_name[user_id]
         created = datetime.utcfromtimestamp(post["create_at"] / 1000).strftime('%Y-%m-%dT%H:%M:%SZ')
         message = post["message"]

@thomas-mc-work
Copy link

thomas-mc-work commented Nov 8, 2023

Thanks @commonism ! I could find out that my problem is not caused by this script. I have a strange issue on login. I can query the API perfectly fine on the CLI: curl -H "Authorization: Bearer $token" https://mattermost.mydomain.org:443/api/v4/users/me. The driver library does exactly the same, but always returns a status code 403.

@thomas-mc-work
Copy link

Found it out: Somehow the reverse proxy (Apache) is configured to require the User-Agent header! Painful experience …

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment