-
-
Save Chandler/fb7a070f52883849de35 to your computer and use it in GitHub Desktop.
print("UPDATE AUG 2023: this script is beyond old and broken") | |
print("You may find interesting and more up to date resources in the comments of the gist") | |
exit() | |
from slacker import Slacker | |
import json | |
import argparse | |
import os | |
# This script finds all channels, private channels and direct messages | |
# that your user participates in, downloads the complete history for | |
# those converations and writes each conversation out to seperate json files. | |
# | |
# This user centric history gathering is nice because the official slack data exporter | |
# only exports public channels. | |
# | |
# PS, this only works if your slack team has a paid account which allows for unlimited history. | |
# | |
# PPS, this use of the API is blessed by Slack. | |
# https://get.slack.help/hc/en-us/articles/204897248 | |
# " If you want to export the contents of your own private groups and direct messages | |
# please see our API documentation." | |
# | |
# get your slack user token at the bottom of this page | |
# https://api.slack.com/web | |
# | |
# dependencies: | |
# pip install slacker # https://github.com/os/slacker | |
# | |
# usage examples | |
# python slack_history.py --token='123token' | |
# python slack_history.py --token='123token' --dryRun=True | |
# python slack_history.py --token='123token' --skipDirectMessages | |
# python slack_history.py --token='123token' --skipDirectMessages --skipPrivateChannels | |
# fetches the complete message history for a channel/group/im | |
# | |
# pageableObject could be: | |
# slack.channel | |
# slack.groups | |
# slack.im | |
# | |
# channelId is the id of the channel/group/im you want to download history for. | |
def getHistory(pageableObject, channelId, pageSize = 100): | |
messages = [] | |
lastTimestamp = None | |
while(True): | |
response = pageableObject.history( | |
channel = channelId, | |
latest = lastTimestamp, | |
oldest = 0, | |
count = pageSize | |
).body | |
messages.extend(response['messages']) | |
if (response['has_more'] == True): | |
lastTimestamp = messages[-1]['ts'] # -1 means last element in a list | |
else: | |
break | |
return messages | |
def mkdir(directory): | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
# fetch and write history for all public channels | |
def getChannels(slack, dryRun): | |
channels = slack.channels.list().body['channels'] | |
print("\nfound channels: ") | |
for channel in channels: | |
print(channel['name']) | |
if not dryRun: | |
parentDir = "channels" | |
mkdir(parentDir) | |
for channel in channels: | |
print("getting history for channel {0}".format(channel['name'])) | |
fileName = "{parent}/{file}.json".format(parent = parentDir, file = channel['name']) | |
messages = getHistory(slack.channels, channel['id']) | |
channelInfo = slack.channels.info(channel['id']).body['channel'] | |
with open(fileName, 'w') as outFile: | |
print("writing {0} records to {1}".format(len(messages), fileName)) | |
json.dump({'channel_info': channelInfo, 'messages': messages }, outFile, indent=4) | |
# fetch and write history for all direct message conversations | |
# also known as IMs in the slack API. | |
def getDirectMessages(slack, ownerId, userIdNameMap, dryRun): | |
dms = slack.im.list().body['ims'] | |
print("\nfound direct messages (1:1) with the following users:") | |
for dm in dms: | |
print(userIdNameMap.get(dm['user'], dm['user'] + " (name unknown)")) | |
if not dryRun: | |
parentDir = "direct_messages" | |
mkdir(parentDir) | |
for dm in dms: | |
name = userIdNameMap.get(dm['user'], dm['user'] + " (name unknown)") | |
print("getting history for direct messages with {0}".format(name)) | |
fileName = "{parent}/{file}.json".format(parent = parentDir, file = name) | |
messages = getHistory(slack.im, dm['id']) | |
channelInfo = {'members': [dm['user'], ownerId]} | |
with open(fileName, 'w') as outFile: | |
print("writing {0} records to {1}".format(len(messages), fileName)) | |
json.dump({'channel_info': channelInfo, 'messages': messages}, outFile, indent=4) | |
# fetch and write history for all private channels | |
# also known as groups in the slack API. | |
def getPrivateChannels(slack, dryRun): | |
groups = slack.groups.list().body['groups'] | |
print("\nfound private channels:") | |
for group in groups: | |
print("{0}: ({1} members)".format(group['name'], len(group['members']))) | |
if not dryRun: | |
parentDir = "private_channels" | |
mkdir(parentDir) | |
for group in groups: | |
messages = [] | |
print("getting history for private channel {0} with id {1}".format(group['name'], group['id'])) | |
fileName = "{parent}/{file}.json".format(parent = parentDir, file = group['name']) | |
messages = getHistory(slack.groups, group['id']) | |
channelInfo = slack.groups.info(group['id']).body['group'] | |
with open(fileName, 'w') as outFile: | |
print("writing {0} records to {1}".format(len(messages), fileName)) | |
json.dump({'channel_info': channelInfo, 'messages': messages}, outFile, indent=4) | |
# fetch all users for the channel and return a map userId -> userName | |
def getUserMap(slack): | |
#get all users in the slack organization | |
users = slack.users.list().body['members'] | |
userIdNameMap = {} | |
for user in users: | |
userIdNameMap[user['id']] = user['name'] | |
print("found {0} users ".format(len(users))) | |
return userIdNameMap | |
# get basic info about the slack channel to ensure the authentication token works | |
def doTestAuth(slack): | |
testAuth = slack.auth.test().body | |
teamName = testAuth['team'] | |
currentUser = testAuth['user'] | |
print("Successfully authenticated for team {0} and user {1} ".format(teamName, currentUser)) | |
return testAuth | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='download slack history') | |
parser.add_argument('--token', help="an api token for a slack user") | |
parser.add_argument( | |
'--dryRun', | |
action='store_true', | |
default=False, | |
help="if dryRun is true, don't fetch/write history only get channel names") | |
parser.add_argument( | |
'--skipPrivateChannels', | |
action='store_true', | |
default=False, | |
help="skip fetching history for private channels") | |
parser.add_argument( | |
'--skipChannels', | |
action='store_true', | |
default=False, | |
help="skip fetching history for channels") | |
parser.add_argument( | |
'--skipDirectMessages', | |
action='store_true', | |
default=False, | |
help="skip fetching history for directMessages") | |
args = parser.parse_args() | |
slack = Slacker(args.token) | |
testAuth = doTestAuth(slack) | |
userIdNameMap = getUserMap(slack) | |
dryRun = args.dryRun | |
if not dryRun: | |
with open('metadata.json', 'w') as outFile: | |
print("writing metadata") | |
metadata = { | |
'auth_info': testAuth, | |
'users': userIdNameMap | |
} | |
json.dump(metadata, outFile, indent=4) | |
if not args.skipChannels: | |
getChannels(slack, dryRun) | |
if not args.skipPrivateChannels: | |
getPrivateChannels(slack, dryRun) | |
if not args.skipDirectMessages: | |
getDirectMessages(slack, testAuth['user_id'], userIdNameMap, dryRun) |
Use this code below. I've used new methods that are not deprecated.
def is_matching_channel(channel_to_check):
if len(CHANNEL_NAME) == 0:
return True
for channel_name_options in CHANNEL_NAME:
if channel_name_options in channel_to_check['name']:
return True
def getHistory(pageableObject, channelId, pageSize=100):
messages = []
lastTimestamp = None
while (True):
response = pageableObject.history(
channel=channelId,
latest=lastTimestamp,
oldest=0
).body
messages.extend(response['messages'])
if (response['has_more'] == True):
lastTimestamp = messages[-1]['ts'] # -1 means last element in a list
else:
break
return messages
def mkdir(directory):
if not os.path.exists(directory):
os.makedirs(directory)
def getUserMap():
print("Getting Slack Users")
try:
all_slack_users_addition_list = requests.get('https://slack.com/api/users.list?token=%s&limit=1000&pretty=1' % (slack_api_id)).json()
addition_list_members = all_slack_users_addition_list['members']
all_slack_users_list.append(addition_list_members)
cursor_id = all_slack_users_addition_list['response_metadata']['next_cursor']
number_of_additions = len(addition_list_members)
num = 0
while num < number_of_additions:
member_to_add = addition_list_members[num]
all_slack_users_list.append(member_to_add)
num += 1
while number_of_additions >= 1000:
print("Still grabbing users...")
try:
all_slack_users_addition_list = requests.get('https://slack.com/api/users.list?token=%s&cursor=%s&limit=1000&pretty=1' % (slack_api_id, cursor_id)).json()
addition_list_members = all_slack_users_addition_list['members']
number_of_additions = len(addition_list_members)
num = 0
while num < number_of_additions:
member_to_add = addition_list_members[num]
all_slack_users_list.append(member_to_add)
num += 1
cursor_id = all_slack_users_addition_list['response_metadata']['next_cursor']
except:
break
except:
print("Couldn't grab your Slack Users. Sorry!")
print("Done grabbing users")
print("Found ", len(all_slack_users_list), " users")
def get_user_name(message):
found_match = False
for user in all_slack_users_list:
try:
if user['id'] == message['user']:
username = user['real_name']
found_match = True
except:
pass
if found_match == False:
username = 'No Username'
return username
def get_channels(slack):
print("Grabbing Your Channels...")
print("*****************")
channels = requests.get('https://slack.com/api/conversations.list?token=%s' % slack_api_id, '&types=public_channel%2Cprivate_channel&pretty=1').json()['channels']
matching_channel_num = 0
for channel in channels:
if is_matching_channel(channel):
matching_channel_num += 1
if matching_channel_num > 0:
print("Found Matching Channels: ")
for channel in channels:
if is_matching_channel(channel):
print(channel['name'])
print("*****************")
print("*****************")
print("*****************")
print('Creating your Excel File...')
print("*****************")
parentDir = "Slack_Channel_Messages"
mkdir(parentDir)
fileName = "{parent}/{file}.xlsx".format(parent=parentDir, file=desired_filename)
workbook = xlsxwriter.Workbook(fileName)
worksheet = workbook.add_worksheet()
row = 1
worksheet.write('A' + str(row), 'MESSAGE')
worksheet.write('B' + str(row), 'REPLY_COUNT')
worksheet.write('C' + str(row), 'REPLY_USERS_COUNT')
worksheet.write('D' + str(row), 'DATE')
worksheet.write('E' + str(row), 'CHANNEL_NAME')
worksheet.write('F' + str(row), 'USER_NAME')
worksheet.write('G' + str(row), 'CHANNEL_TYPE')
for channel in channels:
if is_matching_channel(channel):
if channel['is_private'] == True:
public_or_private = 'private'
else:
public_or_private = 'public'
print("Getting messages for " + public_or_private + " channel {0}".format(channel['name']))
channel_has_messages = False
try:
messages = getHistory(slack.conversations, channel['id'])
if len(messages) > 0:
channel_has_messages = True
except:
print("No messages for channel.")
if channel_has_messages == True:
try:
messages_wrote_count = 0
for message in messages:
try:
message_date_year = str(epoch + timedelta(seconds=float(message['ts'])))[0:4]
message_date_month = str(epoch + timedelta(seconds=float(message['ts'])))[5:7]
message_date_day = str(epoch + timedelta(seconds=float(message['ts'])))[8:10]
message_date_string = (message_date_month + '-' + message_date_day + '-' + message_date_year)
message_date = datetime.strptime(message_date_string, '%m-%d-%Y').date()
except:
message_date = datetime.strptime(today, '%m-%d-%Y').date()
username = get_user_name(message)
if message_date >= time_period_start_date and message_date <= time_period_end_date:
row += 1
try:
worksheet.write(('A'+str(row)), message['text'])
worksheet.write('B'+str(row), message['reply_count'])
worksheet.write('C'+str(row), message['reply_users_count'])
worksheet.write('D'+str(row), message_date_string)
worksheet.write('E' + str(row), channel['name'])
worksheet.write('F' + str(row), username)
worksheet.write('G' + str(row), public_or_private)
except:
worksheet.write('A'+str(row), message['text'])
worksheet.write('B'+str(row), 0)
worksheet.write('C' + str(row), 0)
worksheet.write('D'+str(row), message_date_string)
worksheet.write('E' + str(row), channel['name'])
worksheet.write('F' + str(row), username)
worksheet.write('G' + str(row), public_or_private)
messages_wrote_count += 1
print("Wrote (" + str(messages_wrote_count) + ") messages from channel '" + channel['name'] + "'")
except:
print("Error grabbing messages for channel: " + channel['name'])
print("*****************")
workbook.close()
else:
print("No matching channels found with keyword: " + str(CHANNEL_NAME))
print("Please run the program again.")
exit()
# get basic info about the slack channel to ensure the authentication token works
def do_test_auth(slack):
testAuth = slack.auth.test().body
teamName = testAuth['team']
currentUser = testAuth['user']
print("Successfully authenticated for team {0} and user {1} ".format(teamName, currentUser))
return testAuth
#Run Program
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='download slack history')
parser.add_argument('--token', help="an api token for a slack user")
print("*****************")
print("Slack Channel Messages Exporter")
print("Created by Chandler Abraham")
print("Optimized by Nick Canfield")
print("*****************")
print("Let's get your deets...")
global CHANNEL_NAME
CHANNEL_NAME = []
done_entering_channel_names = False
channels_to_search = 0
print("Please type in the exact channel name, or common text for multiple channel names, to search in.")
print("For all channels, type 'all'")
print("Done entering? Type 'done'")
print("*****************")
while done_entering_channel_names == False:
addition = input("Channel to Look for Messages in: ")
if addition.lower() == 'all':
CHANNEL_NAME = []
print("Let's grab all the channels!")
done_entering_channel_names = True
if len(addition) > 0 and addition.lower() != 'done' and addition.lower() != 'done' and done_entering_channel_names == False:
CHANNEL_NAME.append(addition)
print(CHANNEL_NAME)
channels_to_search += 1
if addition.lower() == 'done' and channels_to_search > 0:
done_entering_channel_names = True
if addition.lower() == 'done' and channels_to_search == 0:
print("You need to enter in a channel first...")
today_year = datetime.today().__str__()[0:4]
today_month = datetime.today().__str__()[5:7]
today_day = datetime.today().__str__()[8:10]
global today
today = today_month + '-' + today_day +'-' + today_year
print("*****************")
print("Today's Date: ", today)
time_period_valid = False
while time_period_valid == False:
time_period_start_valid = False
while time_period_start_valid == False:
global timeperiod_start
timeperiod_start = str(input('Please input the day you want to start looking for messages\n Format "MM-DD-YYYY": '))
print("*****************")
if len(timeperiod_start) == 10 and datetime.strptime(today, '%m-%d-%Y').date() >= datetime.strptime(timeperiod_start, '%m-%d-%Y').date():
time_period_start_valid = True
else:
print("ERROR! Invalid start time. Input again")
print("*****************")
time_period_end_valid = False
while time_period_end_valid == False:
global timeperiod_end
timeperiod_end = str(input('Please input the end date you want to start looking for messages\n Format "MM-DD-YYYY": '))
print("*****************")
if len(timeperiod_end) == 10 and datetime.strptime(today, '%m-%d-%Y').date() >= datetime.strptime(timeperiod_end, '%m-%d-%Y').date():
time_period_end_valid = True
else:
print("ERROR! Invalid end time. Input again")
print("*****************")
time_period_start_date = datetime.strptime(timeperiod_start, '%m-%d-%Y').date()
time_period_end_date = datetime.strptime(timeperiod_end, '%m-%d-%Y').date()
try:
if time_period_start_date <= time_period_end_date:
print("Your dates were valid!")
print("Program will gather messages from: " + timeperiod_start + " --> " + timeperiod_end)
time_period_valid = True
else:
print("Your end date was before begin date.")
print("Please re-enter your date values...")
except:
print("There was an error with your dates. Please try again")
global desired_filename
desired_filename = str(input('Please input your desired filename: '))
print("*****************")
global slack_api_id
slack_api_id = str(input('Please input your slack api user token: '))
print("*****************")
global all_slack_users_list
all_slack_users_list = []
slack_connection_error = False
try:
slack = Slacker(slack_api_id)
testAuth = do_test_auth(slack)
getUserMap()
except:
print("Error. Couldn't find your Slack account or internet connection error.")
slack_connection_error = True
if slack_connection_error == False:
get_channels(slack)
print("*****************")
print("*****************")
print("Congrats! Program ran successfully!")
print("Please check your 'Slack_Channel_Messages' folder for your excel file named '" + desired_filename + ".xlsx'")
it seems that slack has disabled ways to create tokens now may 2020, Since the script depends on having token in its argument , is there any way for the script to work without token ?
https://api.slack.com/legacy/custom-integrations/legacy-tokens
Thanks, @nickcanfield29 for guidance.
I can definitely try it. a slack app to export direct messages into files
All, I want to do is export the "direct messages" out into a file for future reading/searching.
our org has decided to move to a different application and hence no longer willing to use slack paid edition for all,
I tried the Evernote clip method but I don't have the paid version of Evernote so in one direct message itself I reached the limit.
https://www.quora.com/Is-it-possible-to-export-Slack-chats-If-so-then-how
Hey Everyone,
I've taken this code and updated it download messages directly into Excel files on your local computer. Check it out!
https://gist.github.com/nickcanfield29/7e3cef8f3ca58d3dfde205d2bb17bf02
the link does not work
Does anyone know if using this script on a free account wouldn't just download the last 10,000 messages as opposed to the entire history?
It seems like I need slack admin approval to create this app.
Hello,
I've not been able to use this script so I build a new one using the new API : https://gist.github.com/benoit-cty/a5855dea9a4b7af03f1f53c07ee48d3c
Less powerfull but work in April 2021.
This should also include replies to messages via threads
Which scopes should I add? 🤔
>python slack_history.py --token='xoxp-XXXXXXX'
Traceback (most recent call last):
File "C:\Users\XXX\Documents\abc\slack\slack_history.py", line 204, in <module>
testAuth = doTestAuth(slack)
File "C:\Users\XXX\Documents\abc\slack\slack_history.py", line 165, in doTestAuth
testAuth = slack.auth.test().body
File "C:\Users\XXX\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\slacker\__init__.py", line 140, in test
return self.get('auth.test')
File "C:\Users\XXX\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\slacker\__init__.py", line 118, in get
return self._request(
File "C:\Users\XXX\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\slacker\__init__.py", line 102, in _request
raise Error(response.error)
slacker.Error: invalid_auth
Hello, try with new API : https://gist.github.com/benoit-cty/a5855dea9a4b7af03f1f53c07ee48d3c
For people coming from a Google search, I found this script working well: https://github.com/sebseager/slack-exporter
Follow the instructions carefully.
The missing scope error is due to "incomplete" permissions via OAuth. I just got bored and added the entire suite of permissions. But now I get another error :(