Skip to content

Instantly share code, notes, and snippets.

@danielcarr
Created February 4, 2019 12:11
Show Gist options
  • Save danielcarr/b14cbbbd6f4162111e9e1e4c8a229b33 to your computer and use it in GitHub Desktop.
Save danielcarr/b14cbbbd6f4162111e9e1e4c8a229b33 to your computer and use it in GitHub Desktop.
Clear up space in a slack workspace by deleting all files older than a certain number of days, and download and archive them by month
#! /usr/bin/env python3
import json
import os
import time
import re as regex
from zipfile import ZipFile as zipfile, is_zipfile as is_zipfile
import requests
MINIMUM_AGE = 90 # The age of files to delete, in days
BACKUP_PATH = 'backup' # The directory in which to backup deleted files
SLACK_API = 'https://slack.com/api/'
ENDPOINT_LIST_FILES = SLACK_API + 'files.list'
ENDPOINT_DELETE_FILE = SLACK_API + 'files.delete'
ENDPOINT_SEND_MESSAGE = SLACK_API + 'chat.postMessage'
# Tokens from https://api.slack.com/apps/<app_id>/oauth
ADMIN_TOKEN = 'xoxp-...' # Admin user token
BOT_TOKEN = 'xoxb-...' # Housekeeping slack bot token
NOTIFICATION_CHANNEL = '<channel_id>' # The channel on which to post result messages
DAY_SECONDS = 24 * 60 * 60
def list_old_files(days=30):
timestamp = int(time.time()) - (days * DAY_SECONDS)
params = {'token': ADMIN_TOKEN, 'count': 200, 'ts_to': timestamp}
pages = 2 # assume there's more than one page to get started
while pages > 1:
response = requests.get(ENDPOINT_LIST_FILES, params=params)
data = json.loads(response.text)
yield data['files']
pages = data['paging']['pages']
def download_file(slack_file, path):
timestamp = time.localtime(slack_file['timestamp'])
month = time.strftime('%Y%m', timestamp)
name = slack_file['title']
url = slack_file['url_private']
auth_header = {'Authorization': f'Bearer {ADMIN_TOKEN}'}
directory = os.path.join(path, month)
duplicate_pattern = regex.compile('(.*) \(([0-9]+)\)')
if not os.path.isdir(path):
os.mkdir(path)
if not os.path.isdir(directory):
os.mkdir(directory)
while name in os.listdir(directory):
filename, ext = os.path.splitext(name)
match = duplicate_pattern.fullmatch(filename)
if match is None:
name = f'{filename} (1){ext}'
else:
basename = match.group(1)
count = int(match.group(2)) + 1
name = f'{basename} ({count}){ext}'
with open(os.path.join(directory, name), 'wb') as download:
response = requests.get(url, headers=auth_header)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
download.write(chunk)
def archive_files(directory):
duplicate_pattern = regex.compile('(.*) \(([0-9]+)\)')
archive_name = f'{directory}.zip'
archived_files = []
if not os.path.exists(archive_name) or not is_zipfile(archive_name):
archive = zipfile(archive_name, 'x')
archive.close()
else:
with zipfile(archive_name, 'r') as archive:
archived_files = archive.namelist()
with zipfile(os.path.realpath(f'{directory}.zip'), 'a') as archive:
start_directory = os.path.realpath(os.path.curdir)
os.chdir(directory)
files = os.listdir()
for filename in files:
new_name = filename
duplicated = False
while new_name in archived_files or duplicated:
name, ext = os.path.splitext(new_name)
match = duplicate_pattern.fullmatch(name)
if match is None:
new_name = f'{name} (1){ext}'
else:
basename = match.group(1)
count = int(match.group(2)) + 1
new_name = f'{basename} ({count}){ext}'
duplicated = new_name in files
if filename is not new_name:
os.rename(filename, new_name)
filename = new_name
archive.write(filename)
archived_files.append(filename)
os.chdir(start_directory)
def delete_files(file_list):
parameters = {'token': ADMIN_TOKEN}
error_count, warning_count = 0, 0
count = 0
total_bytes = 0
for f in file_list:
parameters['file'] = f['id']
response = requests.post(ENDPOINT_DELETE_FILE, params=parameters)
data = json.loads(response.text)
if data['ok']:
warning = data.get('warning')
if warning is not None:
warning_count += 1
print(f'WARNING | {warning}')
bytesize = f['size']
print(f"DELETED | {f['title']} ({bytesize} bytes)")
count += 1
total_bytes += bytesize
else:
error_count += 1
localtime = time.localtime(f['timestamp'])
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', localtime)
print(f"ERROR | {data.get('error')} for {f['title']} from {timestamp}")
time.sleep(2) # avoid getting ratelimited
return {'files': count, 'bytes': total_bytes, 'warnings': warning_count, 'errors': error_count}
def post_notification(operation_info):
parameters = {
'token': BOT_TOKEN,
'icon_emoji': ':wastebasket:',
'username': 'housekeeping',
'channel': NOTIFICATION_CHANNEL
}
files = operation_info.get('files')
saving = operation_info.get('bytes')
warnings = operation_info.get('warnings')
errors = operation_info.get('errors')
if files is not None and files > 0 and saving is not None and saving > 0:
parameters['text'] = f'Saved {saving} bytes by deleting {files} old files (with {warnings} warnings)'
requests.post(ENDPOINT_SEND_MESSAGE, params=parameters)
if errors is not None and errors > 0:
parameters['text'] = f'Encountered {errors} errors while deleting old files'
requests.post(ENDPOINT_SEND_MESSAGE, params=parameters)
def get_unique_name(filename, prior_names):
duplicate_pattern = regex.compile('(.*) \(([0-9]+)\)')
while filename in prior_names:
name, ext = os.path.splitext(filename)
match = duplicate_pattern.fullmatch(name)
if match is None:
filename = f'{name} (1){ext}'
else:
basename = match(1)
count = int(match(2)) + 1
filename = f'{basename} ({count}){ext}'
return filename
if __name__ == '__main__':
operation_info = {'files': 0, 'bytes': 0, 'warnings': 0, 'errors': 0}
for files in list_old_files(MINIMUM_AGE):
for f in files:
download_file(f, BACKUP_PATH)
result = delete_files(files)
for k in operation_info.keys():
operation_info[k] += result[k]
print(f"Files deleted: {operation_info['files']}")
print(f"Total bytes saved: {operation_info['bytes']}")
print(f"Warnings: {operation_info['warnings']} | Errors: {operation_info['errors']}")
post_notification(operation_info)
# archive downloaded files and cleanup
start_directory = os.path.realpath(os.path.curdir)
os.chdir(BACKUP_PATH)
for folder in os.listdir():
if os.path.isdir(folder):
archive_files(folder)
for f in os.listdir(folder):
if not os.isdir(f):
os.rm(f)
os.rmdir(folder)
os.chdir(start_directory)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment