Skip to content

Instantly share code, notes, and snippets.

@bboe
Last active April 14, 2022 09:54
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bboe/0306fa90271ffc4c21add661d98a81f3 to your computer and use it in GitHub Desktop.
Save bboe/0306fa90271ffc4c21add661d98a81f3 to your computer and use it in GitHub Desktop.
Set retention on slack conversations with deleted users to 1 day.
#!/usr/bin/env python
"""Set retention on slack conversations to 400 days.
Usage: ./set_retention.py [--perform] --token SLACK_TOKEN
The `--perform` flag is necessary to actually perform the action.
Due to the fact that `conversations.setToken` is not part of the public API
the slack legacy token concept will not work. To obtain the API token,
open up slack in the browser and observe networking traffic while performing
a few POST actions on the desired slack workspace. The data field to the POST
message should contain a token beginning with `xoxs-`; that's your token.
"""
import argparse
import os
import sys
import time
import slacker
from requests import HTTPError, ReadTimeout, Session
RETENTION_PLAN = {'retention_duration': 400, 'retention_type': 1}
def handle_rate_limit(method, *args, **kwargs):
count = 5
while count > 0:
try:
response = method(*args, **kwargs)
assert response.successful
assert response.body['ok']
return response
except HTTPError as exception:
if exception.response.status_code == 429:
retry_time = int(exception.response.headers['retry-after'])
retry_time = min(3, retry_time)
if retry_time > 3:
print('Sleeping for {} seconds'.format(retry_time))
time.sleep(retry_time)
else:
raise
except ReadTimeout:
print('Read timeout. Sleeping for 16 seconds')
time.sleep(16)
count -= 1
raise Exception('Max retries exceeded')
def main():
parser = argparse.ArgumentParser(
description='Set retention timeout to 1 day for some conversations',
usage='%(prog)s [--token]')
parser.add_argument('--perform', action='store_true',
help='Actually perform setRetention action')
parser.add_argument('--token', help=(
'The token used to connect to slack. This value can also be passed via'
'the SLACK_TOKEN environment variable.'))
args = parser.parse_args()
token = args.token or os.getenv('SLACK_TOKEN')
if not token:
sys.stderr.write('Either the argument --token or the environment '
'variable SLACK_TOKEN must be provided\n')
return 1
with Session() as session:
slack = slacker.Slacker(token, session=session)
by_id = users(slack)
set_retention_on_direct_messages(slack, by_id, args.perform)
set_retention_on_multiuser_messages(slack, args.perform)
return 0
def paginate(method, *args, collection, params=None):
if params is None:
params = {'limit': 1000}
else:
params.setdefault('limit', 1000)
while params.get('cursor', True):
response = method(*args, params=params)
assert response.successful
for conversation in response.body[collection]:
yield conversation
params['cursor'] = response.body['response_metadata']['next_cursor']
def private_conversations(slack, types):
return paginate(slack.api.get, 'conversations.list',
collection='channels', params={'types': types})
def get_retention(slack, channel):
response = handle_rate_limit(slack.api.post, 'conversations.getRetention',
data={'channel': channel})
assert response.successful
return response.body['retention']
def message_count(slack, channel):
response = handle_rate_limit(slack.api.post, 'search.messages',
data={'query': 'in:<#{}>'.format(channel)})
assert response.successful
return response.body['messages']['total']
def set_retention(slack, channel, retention_plan):
response = handle_rate_limit(
slack.api.post, 'conversations.setRetention', data={
'channel': channel, **retention_plan})
assert response.successful
def set_retention_on_direct_messages(slack, users, perform):
for conversation in private_conversations(slack, 'im'):
print(users[conversation['user']])
if message_count(slack, conversation['id']) <= 0:
continue
current_plan = get_retention(slack, conversation['id'])
proposed_plan = RETENTION_PLAN
if not should_update_retention(current_plan):
continue
print('\t {} -> {}'.format(current_plan, proposed_plan))
if perform:
set_retention(slack, conversation['id'], proposed_plan)
def set_retention_on_multiuser_messages(slack, perform):
for conversation in private_conversations(slack, 'mpim'):
assert conversation['name'].startswith('mpdm-')
assert conversation['name'].endswith('-1')
users = conversation['name'][5:-2].split('--')
if message_count(slack, conversation['id']) <= 0:
continue
current_plan = get_retention(slack, conversation['id'])
proposed_plan = RETENTION_PLAN
if not should_update_retention(current_plan, proposed_plan):
continue
print(', '.join(sorted(users)))
print('\t {} -> {}'.format(current_plan, proposed_plan))
if perform:
set_retention(slack, conversation['id'], proposed_plan)
def should_update_retention(current, proposed):
assert proposed['retention_type'] == 1
assert proposed['retention_duration'] > 0
return int(current['retention_type']) == 0 or (
int(current['retention_duration']) > proposed['retention_duration'])
def users(slack):
by_id = {}
for user in paginate(slack.api.get, 'users.list', collection='members'):
by_id[user['id']] = user['name']
return by_id
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment