Last active
December 13, 2017 23:31
-
-
Save gigamonkey/1476bdcc7dad7fd2eaead854202e02cd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# Dump all the messages from a given Slack channel since a certain | |
# timestamp, one line per message with timestamp and user name. | |
# | |
from datetime import datetime | |
from pytz import timezone, utc | |
from urllib.parse import urlencode | |
import json | |
import re | |
import requests | |
import sys | |
def load_config(file): | |
with open(file) as f: | |
return json.load(f) | |
config = load_config('secret.json') | |
def api_url(method, args): | |
args['token'] = config['client']['oauth'] | |
return 'https://slack.com/api/{method}?{args}'.format(method=method, args=urlencode(args)) | |
def api_request(method, **args): | |
r = requests.get(api_url(method, args)) | |
if r.status_code == 200: | |
resp = json.loads(r.text) | |
if resp['ok']: | |
return resp | |
else: | |
raise Exception("Not ok: {}".format(resp)) | |
else: | |
raise Exception("Error on request: {}".format(r)) | |
def all_channels(cursor=None): | |
args = {} | |
if cursor is not None: args['cursor'] = cursor | |
resp = api_request('channels.list', **args) | |
yield from resp['channels'] | |
if 'response_metadata' in resp and 'next_cursor' in resp['response_metadata']: | |
yield from all_channels(resp['response_metadata']['next_cursor']) | |
def all_messages(channel, oldest): | |
# N.B. this depends on the contract that if we only past oldest | |
# (and not latest as well) the API returns the messages nearest | |
# oldest rather than nearest latest. | |
resp = api_request('channels.history', channel=channel, oldest=oldest, count=1000) | |
messages = resp['messages'] | |
if len(messages) > 0: | |
yield from reversed(messages) | |
if resp['has_more']: | |
yield from all_messages(channel, float(messages[0]['ts'])) | |
def channel_ids(): | |
return { c['name'] : c['id'] for c in all_channels() } | |
def get_user(id, cache): | |
if id not in cache: | |
cache[id] = api_request('users.info', user=id)['user']['real_name'] | |
return cache[id] | |
if __name__ == '__main__': | |
users = dict() | |
eastern = timezone('US/Eastern') | |
channel_id = channel_ids()[sys.argv[1]] | |
start = float(sys.argv[2]) | |
for m in all_messages(channel_id, start): | |
time = datetime.fromtimestamp(float(m['ts']), utc).astimezone(eastern).strftime('%a %I:%M %p') | |
user = get_user(m['user'], users) | |
text = re.sub('<@(.*?)>', lambda m: get_user(m.group(1), users), m['text']) | |
if 'subtype' in m and m['subtype'] == 'channel_join': | |
print('[{}] {}'.format(time, text)) | |
else: | |
print('[{}] {}: {}'.format(time, user, text)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment