Skip to content

Instantly share code, notes, and snippets.

@gigamonkey
Last active December 13, 2017 23:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gigamonkey/1476bdcc7dad7fd2eaead854202e02cd to your computer and use it in GitHub Desktop.
Save gigamonkey/1476bdcc7dad7fd2eaead854202e02cd to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
#
# Dump all the messages from a given Slack channel since a certain
# timestamp, one line per message with timestamp and user name.
#
from datetime import datetime
from pytz import timezone, utc
from urllib.parse import urlencode
import json
import re
import requests
import sys
def load_config(file):
with open(file) as f:
return json.load(f)
config = load_config('secret.json')
def api_url(method, args):
args['token'] = config['client']['oauth']
return 'https://slack.com/api/{method}?{args}'.format(method=method, args=urlencode(args))
def api_request(method, **args):
r = requests.get(api_url(method, args))
if r.status_code == 200:
resp = json.loads(r.text)
if resp['ok']:
return resp
else:
raise Exception("Not ok: {}".format(resp))
else:
raise Exception("Error on request: {}".format(r))
def all_channels(cursor=None):
args = {}
if cursor is not None: args['cursor'] = cursor
resp = api_request('channels.list', **args)
yield from resp['channels']
if 'response_metadata' in resp and 'next_cursor' in resp['response_metadata']:
yield from all_channels(resp['response_metadata']['next_cursor'])
def all_messages(channel, oldest):
# N.B. this depends on the contract that if we only past oldest
# (and not latest as well) the API returns the messages nearest
# oldest rather than nearest latest.
resp = api_request('channels.history', channel=channel, oldest=oldest, count=1000)
messages = resp['messages']
if len(messages) > 0:
yield from reversed(messages)
if resp['has_more']:
yield from all_messages(channel, float(messages[0]['ts']))
def channel_ids():
return { c['name'] : c['id'] for c in all_channels() }
def get_user(id, cache):
if id not in cache:
cache[id] = api_request('users.info', user=id)['user']['real_name']
return cache[id]
if __name__ == '__main__':
users = dict()
eastern = timezone('US/Eastern')
channel_id = channel_ids()[sys.argv[1]]
start = float(sys.argv[2])
for m in all_messages(channel_id, start):
time = datetime.fromtimestamp(float(m['ts']), utc).astimezone(eastern).strftime('%a %I:%M %p')
user = get_user(m['user'], users)
text = re.sub('<@(.*?)>', lambda m: get_user(m.group(1), users), m['text'])
if 'subtype' in m and m['subtype'] == 'channel_join':
print('[{}] {}'.format(time, text))
else:
print('[{}] {}: {}'.format(time, user, text))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment