Skip to content

Instantly share code, notes, and snippets.

@geekingfrog
Last active December 9, 2016 10:41
Show Gist options
  • Save geekingfrog/c775be13902e06cd5f2b to your computer and use it in GitHub Desktop.
Save geekingfrog/c775be13902e06cd5f2b to your computer and use it in GitHub Desktop.
Get a slack team history
import sys
import traceback
import os
import aiohttp
import asyncio
import datetime
API_URL = 'https://slack.com/api'
print('args: ', sys.argv)
class APIException(Exception):
pass
class SlackAPI():
def __init__(self, token):
self.token = token
self.session = aiohttp.ClientSession()
def close(self):
print('closing session')
self.session.close()
async def _make_request(self, path, params=None):
if params is None:
params = {}
if 'token' not in params:
params['token'] = self.token
async with self.session.get(API_URL+path, params=params) as r:
rjson = await r.json()
if rjson['ok'] != True:
raise APIException('ERROR for %s: %s' % (path, rjson['error']))
return rjson
async def list_channels(self):
r = await self._make_request('/channels.list')
# On my account I have a bug with a channel where a bot
# flooded the chan. Until I find the root cause, blacklist
# this channel
return [Channel(self, chan) for chan in r['channels']
if chan['id'] != 'C0EANS1S5']
async def list_users(self):
r = await self._make_request('/users.list')
return [User(usr) for usr in r['members']]
async def list_im_channels(self):
r = await self._make_request('/im.list')
usr_list = await self.list_users()
return [ImChannel(self, chan, usr_list) for chan in r['ims']]
async def list_mpim_channels(self):
r = await self._make_request('/mpim.list')
return [MPImChannel(self, chan) for chan in r['groups']]
class User():
def __init__(self, payload):
self.id = payload['id']
self.name = payload['name']
self.is_deleted = payload['deleted']
self.real_name = payload.get('real_name', '')
self.email = payload.get('profile', {}).get('email', None)
def tsv_serialize(self):
return '%s\t%s\t%s\t%s\n' % (
self.id,
self.name,
self.real_name,
self.is_deleted
)
def __eq__(self, other):
return self.id == other.id
class GenericChannel():
def __init__(self, api, id):
self.api = api
self.id = id
async def get_messages(self, path, oldest=None, count=100):
params = {
'channel': self.id,
'oldest': int(oldest) or 1,
'count': count
}
r = await self.api._make_request(path, params=params)
return (r['messages'], r['has_more'])
class Channel(GenericChannel):
def __init__(self, api, payload):
super().__init__(api, payload['id'])
self.name = payload['name']
self.is_archived = payload['is_archived']
def __str__(self):
return '%s (%s)' % (self.name, self.id)
async def get_messages(self, **kwargs):
return await super().get_messages('/channels.history', **kwargs)
class ImChannel(GenericChannel):
def __init__(self, api, payload, user_list):
super().__init__(api, payload['id'])
self.user_id = payload['user']
other_usr = find_user(user_list, self.user_id)
self.user_name = other_usr.name
self.timestamp = payload['created']
# to simplify a bit, make ImChannel really similar to Channel
self.name = self.user_name
def __str__(self):
return 'Direct message to %s from %s' % (
self.user_name, humanize_ts(self.timestamp))
async def get_messages(self, **kwargs):
return await super().get_messages('/im.history', **kwargs)
class MPImChannel(GenericChannel):
def __init__(self, api, payload):
super().__init__(api, payload['id'])
self.name = payload['name']
self.timestamp = payload['created']
self.members = payload['members']
def __str__(self):
return 'multiparty direct messages with %s' % self.members
async def get_messages(self, **kwargs):
return await super().get_messages('/mpim.history', **kwargs)
def humanize_ts(timestamp):
"""convert a float (timestamp) to human readable string"""
date = datetime.datetime.fromtimestamp(timestamp)
return date.strftime('%Y-%m-%d %H:%M:%S')
def find_user(user_list, user_id):
# Awfully inneficient but not very important here
for usr in user_list:
if usr.id == user_id:
return usr
return None
class GenericChanSerializer():
def __init__(self, api, channel, fd_path):
self.channel = channel
self._fd_path = fd_path
self._api = api
def find_latest_ts(self):
try:
with open(self._fd_path, 'r') as file_descriptor:
last_line = file_descriptor.readlines()[-1]
return float(last_line.split('\t')[0])
except (FileNotFoundError, IndexError):
# Nothing saved for this channel yet, fetch from the beginning
# of time
return 1.0
except:
print('ERROR for %s' % self._fd_path)
traceback.print_exc(file=sys.stderr)
raise
async def save_all_history(self):
latest_ts = self.find_latest_ts()
has_more = True
with open(self._fd_path, 'a') as dest:
while has_more:
print('fetching %s messages from %s' % (self.channel.name,
humanize_ts(latest_ts)))
(messages, has_more) = await self.channel.get_messages(
oldest=latest_ts, count=1000) # maximum possible count
if len(messages) == 0:
print('Got everything from channel %s' % self.channel.name)
return
messages.reverse()
for msg in messages:
# handle bot messages or other weird subtypes
if 'user' not in msg:
continue
dest.write('%s\t%s\t%s\n' % (msg['ts'],
msg['user'],
msg['text'].replace('\n', '\\n')))
dest.flush()
latest_ts = float(messages[-1]['ts'])
print('wrote %d messages to %s, more to follow? %s' %
(len(messages), self._fd_path, has_more))
class ChanSerializer(GenericChanSerializer):
def __init__(self, api, channel):
fd_name = '%s_%s' % (channel.name, channel.id)
fd_path = os.path.join('data/channels', '%s.tsv' % fd_name)
super().__init__(api, channel, fd_path)
class ImChanSerializer(GenericChanSerializer):
def __init__(self, api, im_chan):
fd_name = '%s_%s' % (im_chan.user_name, im_chan.id)
fd_path = os.path.join('data/im_channels', '%s.tsv' % fd_name)
super().__init__(api, im_chan, fd_path)
class MPImChanSerializer(GenericChanSerializer):
def __init__(self, api, mpim_chan):
fd_name = '%s_%s' % (mpim_chan.name, mpim_chan.id)
fd_path = os.path.join('data/mpim_channels', '%s.tsv' % fd_name)
super().__init__(api, mpim_chan, fd_path)
async def save_list_users(api):
print('saving list of users')
users = await api.list_users()
path = './data/users.tsv'
with open(path, 'w') as dest:
for usr in users:
dest.write(usr.tsv_serialize())
print('Successfully saved %d users to %s' % (len(users), path))
async def main():
if len(sys.argv) != 2:
print('Usage: %s <token>' % sys.argv[0])
sys.exit(1)
token = sys.argv[1]
api = SlackAPI(token)
await save_list_users(api)
channels = [chan for chan in await api.list_channels()
if not chan.is_archived]
im_channels = [chan for chan in await api.list_im_channels()]
mpim_channels = [chan for chan in await api.list_mpim_channels()]
print('Will retrieve history for %d public channels' % len(channels))
print('Will retrieve history for %d private channels' % len(im_channels))
print('Will retrieve history for %d multiparty direct channels' %
len(mpim_channels))
public_history = [ChanSerializer(api, chan).save_all_history()
for chan in channels]
private_history = [ImChanSerializer(api, chan).save_all_history()
for chan in im_channels]
mpim_history = [MPImChanSerializer(api, chan).save_all_history()
for chan in mpim_channels]
await asyncio.wait(public_history + private_history + mpim_history)
print('All done \\o/')
def init_directories():
os.makedirs('./data', exist_ok=True)
os.makedirs('./data/channels', exist_ok=True)
os.makedirs('./data/im_channels', exist_ok=True)
os.makedirs('./data/mpim_channels', exist_ok=True)
if __name__ == '__main__':
try:
init_directories()
asyncio.get_event_loop().run_until_complete(main())
except:
traceback.print_exc(file=sys.stderr)
sys.exit(1)
print('all done')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment