Skip to content

Instantly share code, notes, and snippets.

@busbey
Last active January 13, 2018 05:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save busbey/434d83bafb8ef9faa2376b616f295dea to your computer and use it in GitHub Desktop.
Save busbey/434d83bafb8ef9faa2376b616f295dea to your computer and use it in GitHub Desktop.
# Run with --help for cli options
#
# Look below for the section marked XXX on how to enable deletes
#
# python2 old_slack_files.py --aggregate-by-type --domain example path/to/my/example.oauth.token.file
#
# Original content from
#
# https://www.shiftedup.com/2014/11/13/how-to-bulk-remove-files-from-slack
#
# Modifications (c) Sean Busbey and licensed under ALv2
#
# changes:
# * don't delete unless opt-in
# * add arguments for limiting to just one user
# * handle result pagination
# * don't delete starred items unless told to
# * don't delete private items unless told to
# * add arguments for domain and OATH token
# * add summary of bytes for requested deletes
# * add summary of things we skip
# * don't delete pinned items unless told to
# * add argument for how old files must be to be eligible
# * optionally print file information
# * optionally provide breakdown of files by author and type of file
# * take user arg optionally as an email address
#
import argparse
import pprint
import requests
import json
import sys
import calendar
from datetime import datetime, timedelta
import locale
locale.setlocale(locale.LC_ALL, 'en_US')
parser = argparse.ArgumentParser()
parser.add_argument('--delete', action='store_true', help='delete summarized files. default is read-only.')
# TODO make this a list of users
parser.add_argument('-u', '--only-user', help='only include files from the given user (use email or internal Slack ID)')
parser.add_argument('-i', '--include-private', action='store_true', help='include non-public items.')
parser.add_argument('-s', '--include-starred', action='store_true', help='include starred items.')
parser.add_argument('--domain', default="theparlour", help='slack domain, eg https://theparlour.slack.com/ => theparlour')
parser.add_argument('-d', '--days-old', type=int, default=90, help='only select files that are at least this age in days. pass 0 for "everything". default 90.')
parser.add_argument('-p', '--include-pinned', action='store_true', help='include pinned items.')
parser.add_argument('-l', '--info', action='store_true', help='print out information about selected files.')
parser.add_argument('--debug', action='store_true', help='give more details about in-progress effort.')
parser.add_argument('token_file', type=argparse.FileType('r'), help='should contain just the OAUTH token from your installed app.')
parser.add_argument('-t', '--only-type', default="images,videos,pdfs", help='only include files of the given type. valid options: all, images, videos, zips, pdfs, spaces, snippets, gdocs default: images,videos,pdfs')
parser.add_argument('--aggregate-by-type', action='store_true', help='provide counts aggregated by file type')
parser.add_argument('--aggregate-by-user', action='store_true', help='provide counts aggregated by authoring user ID')
parser.add_argument('--user-names', action='store_true', help='when printing file info or aggregates, use user name instead of slack ID')
args = parser.parse_args()
# TODO list of tokens, one per line in the file.
_token = args.token_file.readline().strip()
def lookup_user(id, cache, token):
if not id in cache:
result = requests.post('https://slack.com/api/users.info', data = {
'token': token,
'user': id
}).json()
if result["ok"]:
cache[id] = result["user"]
else:
cache[id] = {'id': id, 'name': id}
return cache[id]
if __name__ == '__main__':
if args.only_user:
if '@' in args.only_user:
request_url = 'https://slack.com/api/users.lookupByEmail'
request_data = { 'token' : _token, 'email' : args.only_user }
else:
request_url = 'https://slack.com/api/users.info'
request_data = { 'token' : _token, 'user' : args.only_user }
result = requests.post(request_url, data = request_data).json()
if result["ok"]:
author = result["user"]
else:
print "Couldn't find user given in the --only-user option."
if args.debug:
pprint.pprint(result)
sys.exit(-1)
if args.debug:
if args.only_user:
print "Only include files from %s (id %s)" % (author["name"], author["id"])
if args.only_type:
print "Only include files of type %s" % (args.only_type)
if args.days_old:
print "picking out files that are older than %i days" % (args.days_old)
if not args.delete:
print "Read only mode. Will summarize files but no deleting. Pass --delete to attempt removing files."
page = 1
pages = 1
users = {}
files_to_delete = []
# TODO summary by user
bytes_deleted = 0
files_skipped_star = 0
bytes_skipped_star = 0
files_skipped_private = 0
bytes_skipped_private = 0
files_skipped_pinned = 0
bytes_skipped_pinned = 0
files_aggregate = {'total' : {'count': 0, 'bytes': 0}} if args.aggregate_by_type or args.aggregate_by_user else None
while page <= pages:
files_list_url = 'https://slack.com/api/files.list'
date = str(calendar.timegm((datetime.now() + timedelta(-1 * args.days_old))
.utctimetuple()))
data = {"token": _token, "ts_to": date, "page": page, "types": args.only_type}
if args.only_user:
data["user"] = author["id"]
response = requests.post(files_list_url, data = data)
result = response.json()
if len(result["files"]) == 0:
break
if args.debug:
print "results include %i files on page %i, of total %i files on %i pages" % (len(result["files"]), result["paging"]["page"], result["paging"]["total"], result["paging"]["pages"])
pages = result["paging"]["pages"]
for f in result["files"]:
if args.only_user and author["id"] != f["user"]:
continue
if not args.include_starred and "num_stars" in f and f["num_stars"] > 0:
files_skipped_star += 1
bytes_skipped_star += f["size"]
continue
if not args.include_private and not f["is_public"]:
files_skipped_private += 1
bytes_skipped_private += f["size"]
continue
if not args.include_pinned and "pinned_to" in f and len(f["pinned_to"]) > 0:
files_skipped_pinned += 1
bytes_skipped_pinned += f["size"]
continue
bytes_deleted += f["size"]
files_to_delete.append(f)
if args.info:
print "Info on '" + f["name"] + "':"
print "\tFile is %s" % ("public" if f["is_public"] else "private")
print "\tFile title: %s" % (f["title"])
print "\tFile author: %s" % (lookup_user(f["user"], users, _token)["name"] if args.user_names else f["user"])
print "\tFile type: %s" % (f["filetype"])
print "\tFile size: %i" % (f["size"])
print "\tFile has %i comments" % (f["comments_count"])
print "\tFile is in %i channels" % (len(f["channels"]))
print "\tFile is in %i groups" % (len(f["groups"]))
print "\tFile is in %i ims" % (len(f["ims"]) if "ims" in f else 0)
print "\tFile has %i stars" % (f["num_stars"] if "num_stars" in f else 0)
print "\tFile is pinned in %i places" % (len(f["pinned_to"]) if "pinned_to" in f else 0)
if files_aggregate:
aggregate_dict = files_aggregate
increments = [files_aggregate['total']]
if args.aggregate_by_user:
user_key = lookup_user(f["user"], users, _token)["name"] if args.user_names else f["user"]
if not user_key in aggregate_dict:
aggregate_dict[user_key] = {'total' : {'count':0, 'bytes':0}} if args.aggregate_by_type else {'count': 0, 'bytes': 0}
aggregate_dict = aggregate_dict[user_key]
if args.aggregate_by_type:
increments.append(aggregate_dict['total'])
else:
increments.append(aggregate_dict)
if args.aggregate_by_type:
if not f["filetype"] in aggregate_dict:
aggregate_dict[f["filetype"]] = {'count': 0, 'bytes': 0}
increments.append(aggregate_dict[f["filetype"]])
for incr in increments:
incr['count'] += 1
incr['bytes'] += f['size']
page += 1
if files_skipped_star > 0:
print "skipped %i files because they were starred. would have claimed %s additional bytes. pass --include-starred to include them." % (files_skipped_star, locale.format("%d", bytes_skipped_star, grouping=True))
if files_skipped_private > 0:
print "skipped %i files because they were private. would have claimed %s additional bytes. pass --include-private to include them." % (files_skipped_private, locale.format("%d", bytes_skipped_private, grouping=True))
if files_skipped_pinned > 0:
print "skipped %i files because they were pinned. would have claimed %s additional bytes. pass --include-pinned to include them." % (files_skipped_pinned, locale.format("%d", bytes_skipped_pinned, grouping=True))
if args.delete:
print "Attempting to reclaim %s bytes from %i files." % (locale.format("%d", bytes_deleted, grouping=True), len(files_to_delete))
failed_count = 0
failed_bytes = 0
for f in files_to_delete:
if args.debug:
print "Deleting file " + f["name"] + "..."
timestamp = str(calendar.timegm(datetime.now().utctimetuple()))
delete_url = "https://" + args.domain + ".slack.com/api/files.delete?t=" + timestamp
# XXX Delete this line and the one below, then uncomment the following lines when ready for file deletion
delete_response = { "ok" : False }
# delete_response = requests.post(delete_url, data = {
# "token": _token,
# "file": f["id"],
# "set_active": "true",
# "_attempts": "1"}).json()
if not delete_response["ok"]:
failed_count += 1
failed_bytes += f["size"]
if args.debug:
print "Failed to delete file %s" % (f["name"])
pprint.pprint(delete_response)
if failed_count > 0:
print "Failed to remove %i files, for a total of %s bytes." % (failed_count, locale.format("%d", failed_bytes, grouping=True))
print "DONE!"
else:
print "Total: %s bytes from %i files." % (locale.format("%d", bytes_deleted, grouping=True), len(files_to_delete))
if files_aggregate:
print "breakdown of files:"
pprint.pprint(files_aggregate)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment