Skip to content

Instantly share code, notes, and snippets.

@alexbbt
Last active June 1, 2018 23:49
Show Gist options
  • Save alexbbt/4ed030967db47ddf37f17261a90e4f28 to your computer and use it in GitHub Desktop.
Save alexbbt/4ed030967db47ddf37f17261a90e4f28 to your computer and use it in GitHub Desktop.
Export slack messages in the same format as the standard slack export
Slacker
argparse
# Find it here: https://gist.github.com/alexbbt/4ed030967db47ddf37f17261a90e4f28/edit
# MIT License
# Copyright (c) 2018 Alexander Bell-Towne
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from slacker import Slacker
import json
import argparse
import os
import sys, time
CONFIG = {
'path': '.',
'dryRun': False,
'overwrite': False,
'exclude': '',
'verbose': False,
}
def spinning_cursor():
while True:
for cursor in '|/-\\':
yield cursor
class RetryError(Exception):
pass
def retryloop(attempts, timeout):
starttime = time.time()
success = set()
for i in range(attempts):
success.add(True)
yield success.clear
if success:
return
if time.time() > starttime + timeout:
break
raise RetryError
# This script finds all channels, private channels and direct messages
# that your user participates in, downloads the complete history for
# those converations and writes each conversation out to seperate json files.
#
# This user centric history gathering is nice because the official slack data exporter
# only exports public channels.
#
# PS, this only works if your slack team has a paid account which allows for unlimited history.
#
# PPS, this use of the API is blessed by Slack.
# https://get.slack.help/hc/en-us/articles/204897248
# " If you want to export the contents of your own private groups and direct messages
# please see our API documentation."
#
# get your slack user token at the bottom of this page. These tokens are now legacy but can still be generated here:
# https://api.slack.com/custom-integrations/legacy-tokens
#
# dependencies:
# pip3 install slacker # https://github.com/os/slacker
#
# usage examples
# export token=<your token here>
# python3 slack_history.py $token
# python3 slack_history.py $token --dryRun
# python3 slack_history.py $token --skipDirectMessages
# python3 slack_history.py $token --skipDirectMessages --skipPrivateChannels
# python3 slack_history.py $token --overwrite # this will overwrite channels which have already been downloaded
# python3 slack_history.py $token --exclude=log # this will exclude ang channel with 'log' in its name
# fetches the complete message history for a channel/group/im
#
# pageableObject could be:
# slack.channel
# slack.groups
# slack.im
#
# channelId is the id of the channel/group/im you want to download history for.
def getHistory(pageableObject, channelId, name, pageSize = 1000):
lastTimestamp = 0
hadPages = False
hadError = False
errorCount = 0
spinner = spinning_cursor()
while(True):
try:
response = pageableObject.history(
channel = channelId,
latest = lastTimestamp,
oldest = 0,
count = pageSize
).body
messages = response['messages']
if len(messages) <= 0:
break
fileName = "{parent}/{file}.json".format(parent = name, file = messages[-1]['ts'])
outputJson(
fileName,
messages,
"writing {0} records to {1}".format(len(messages), fileName)
)
if not CONFIG['verbose']:
if hadPages:
sys.stdout.write('\b')
sys.stdout.write(next(spinner))
sys.stdout.flush()
if (response['has_more'] == True):
hadPages = True
lastTimestamp = messages[-1]['ts'] # -1 means last element in a list
else:
break
except Exception as error:
hadError = True
if errorCount > 5:
if (CONFIG['verbose']):
print("waiting 30 seconds to avoid rate limit")
time.sleep(30)
errorCount = 0
else:
if (CONFIG['verbose']):
print("waiting 1 second to avoid rate limit")
time.sleep(1)
errorCount += 1
if not CONFIG['verbose']:
sys.stdout.write('\b')
sys.stdout.write('.')
sys.stdout.flush()
def outputJson(fileName, data, message):
if not CONFIG['dryRun']:
folder = "./{path}".format(path = CONFIG['path'])
mkdir(folder, True)
wholepath = "{folder}/{fileName}".format(folder = folder, fileName = fileName)
with open(wholepath, 'w') as outFile:
if (CONFIG['verbose']): print(message)
json.dump(data, outFile, indent=4)
def mkdir(directory, root = False):
wholepath = directory
if not root:
wholepath = "./{path}/{directory}".format(path = CONFIG['path'], directory = directory)
if not os.path.exists(wholepath):
os.makedirs(wholepath)
return False
return True
def getObject(pageableObject, objectName, data, nameKey, idKey):
outputJson(
"{objectName}.json".format(objectName = objectName),
data,
"writing {objectName} file".format(objectName = objectName)
)
if not CONFIG['dryRun']:
for index, d in enumerate(data):
folderName = d[nameKey]
if CONFIG['exclude'] and CONFIG['exclude'] in folderName:
if (CONFIG['verbose']):
print("({index}) skipping {objectName} {folderName} as it is excluded".format(index = index, objectName = objectName, folderName = folderName))
continue
if (mkdir(folderName) and not CONFIG['overwrite']):
if (CONFIG['verbose']): print("({index}) skipping {objectName} {folderName}".format(index = index, objectName = objectName, folderName = folderName))
continue
if (CONFIG['verbose']):
print()
print("({index}) getting history for {objectName} {folderName}".format(index = index, objectName = objectName, folderName = folderName))
for retry in retryloop(10000, timeout=2):
try:
getHistory(pageableObject, d[idKey], folderName)
except Exception as error:
retry()
print()
# fetch and write history for all public channels
def getChannels(slack):
allchannels = slack.channels.list().body['channels']
channels = []
if (CONFIG['isMember']):
for channel in allchannels:
if channel['is_member']:
channels.append(channel)
else:
channels = allchannels
if (CONFIG['verbose']):
print("\nfound channels: ")
for channel in channels:
print(channel['name'])
else:
print("\nfound channels: {0}".format(len(channels)))
getObject(slack.channels, 'channels', channels, 'name', 'id')
# fetch and write history for all direct message conversations
# also known as IMs in the slack API.
def getDirectMessages(slack, ownerID):
dms = slack.im.list().body['ims']
if (CONFIG['verbose']):
print("\nfound direct messages (1:1) with the following users:")
for dm in dms:
print(dm['user'])
else:
print("\nfound direct messages (1:1): {0}".format(len(dms)))
for dm in dms:
dm['members'] = [
ownerID,
dm['user'],
]
getObject(slack.im, 'dms', dms, 'id', 'id')
# fetch and write history for all private channels
# also known as groups in the slack API.
def getPrivateChannels(slack):
groups = slack.groups.list().body['groups']
mpims = []
privateChannels = []
for g in groups:
if g['is_mpim']:
mpims.append(g)
else:
privateChannels.append(g)
if (CONFIG['verbose']):
print("\nfound private channels:")
for group in privateChannels:
print("{0}: ({1} members)".format(group['name'], len(group['members'])))
else:
print("\nfound private channels: {0}".format(len(privateChannels)))
getObject(slack.groups, 'groups', privateChannels, 'name', 'id')
if (CONFIG['verbose']):
print("\nfound group messages:")
for group in mpims:
print("{0}: ({1} members)".format(group['name'], len(group['members'])))
else:
print("\nfound group messages: {0}".format(len(mpims)))
getObject(slack.groups, 'mpims', mpims, 'name', 'id')
# fetch all users for the channel and return a map userId -> userName
def getUsers(slack):
#get all users in the slack organization
users = slack.users.list().body['members']
outputJson(
"users.json",
users,
"writing users file"
)
usersMap = {}
for user in users:
usersMap[user['id']] = user
return usersMap
# get basic info about the slack channel to ensure the authentication token works
def doTestAuth(slack):
testAuth = slack.auth.test().body
teamName = testAuth['team']
currentUser = testAuth['user']
print("Successfully authenticated for team {0} and user {1} ".format(teamName, currentUser))
return testAuth
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='download slack history')
parser.add_argument('token', help="an api token for a slack user")
parser.add_argument('--path', help="where to store the data")
parser.add_argument(
'--dryRun',
action='store_true',
default=False,
help="if dryRun is true, don't fetch/write history only get channel names")
parser.add_argument(
'--skipPrivateChannels',
action='store_true',
default=False,
help="skip fetching history for private channels")
parser.add_argument(
'--skipChannels',
action='store_true',
default=False,
help="skip fetching history for channels")
parser.add_argument(
'--skipDirectMessages',
action='store_true',
default=False,
help="skip fetching history for directMessages")
parser.add_argument(
'--overwrite',
action='store_true',
default=False,
help="Over write channels which have already been downloaded")
parser.add_argument('--exclude', help="a string to use to exclude channels")
parser.add_argument(
'--verbose',
action='store_true',
default=False,
help="print more")
parser.add_argument(
'--isMember',
action='store_true',
default=False,
help="only include channel in which you are a member")
args = parser.parse_args()
slack = Slacker(args.token)
testAuth = doTestAuth(slack)
users = getUsers(slack)
CONFIG['path'] = args.path
CONFIG['dryRun'] = args.dryRun
CONFIG['overwrite'] = args.overwrite
CONFIG['exclude'] = args.exclude
CONFIG['verbose'] = args.verbose
CONFIG['isMember'] = args.isMember
if not args.skipChannels:
getChannels(slack)
if not args.skipDirectMessages:
getDirectMessages(slack, testAuth['user_id'])
if not args.skipPrivateChannels:
getPrivateChannels(slack)
@alexbbt
Copy link
Author

alexbbt commented Jun 1, 2018

The files produced by this script can be zipped up and used with this slack viewer: https://github.com/hfaran/slack-export-viewer/blob/master/slackviewer/reader.py

Note: you need to use the source, as the version in pip does not work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment