Created
June 14, 2021 20:59
-
-
Save gleeblezoid/a2fabcc93a38a465fcc71765ab69db40 to your computer and use it in GitHub Desktop.
Slack channel archive and unarchive tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import datetime | |
import csv | |
import sys | |
# Authentication variables | |
bot_token = os.environ['SLACK_BOT_TOKEN'] | |
user_token = os.environ['SLACK_USER_TOKEN'] | |
# Get the list of Slack channels in the workspace | |
def getChannels(): | |
list_url = "https://slack.com/api/conversations.list?limit=1000" | |
payload = {} | |
headers = { | |
'Authorization': 'Bearer '+bot_token | |
} | |
list_response = requests.request( | |
"GET", list_url, headers=headers, data=payload) | |
i = 0 | |
dictOfChannels = {} | |
while i < len(list_response.json()['channels']): | |
channel_id = (list_response.json()['channels'][i]['id']) | |
channel_name = (list_response.json()['channels'][i]['name']) | |
dictOfChannels[channel_id] = channel_name | |
i += 1 | |
return dictOfChannels | |
# Extract channel history based on channel ID | |
def getLastMessageTimestamp(channel_id): | |
history_url = "https://slack.com/api/conversations.history?channel=" + \ | |
channel_id+"&latest=&limit=1" | |
payload = {} | |
headers = { | |
'Authorization': 'Bearer ' + user_token | |
} | |
history_response = requests.request( | |
"GET", history_url, headers=headers, data=payload) | |
# Retrieve timestamp from most recent message in a channel | |
lastMessageTimestamp = (datetime.datetime.fromtimestamp( | |
float(history_response.json()['messages'][0]['ts']))) | |
return lastMessageTimestamp | |
# Return the names of channels with a latest message older than six months | |
def channelsForArchiving(): | |
sixMonthsAgo = datetime.datetime.today()-datetime.timedelta(days=180) | |
dictofchannels = getChannels() | |
listofchannelIDs = dictofchannels.keys() | |
listOfDormantChannelIDs = [] | |
for x in listofchannelIDs: | |
try: | |
timestamp = getLastMessageTimestamp(x) | |
if timestamp < sixMonthsAgo or timestamp is None: | |
listOfDormantChannelIDs.append(x) | |
except: | |
continue | |
dormantChannels = {} | |
for x in listOfDormantChannelIDs: | |
dormantChannels[x] = dictofchannels[x] | |
return dormantChannels | |
# Download CSV of inactive channels | |
def csvOfDormantChannels(): | |
dormantChannels = channelsForArchiving() | |
try: | |
with open('dormant_channels.csv', 'w') as csvfile: | |
fields = ['ChannelID', 'ChannelName'] | |
writer = csv.DictWriter(csvfile, fieldnames=fields) | |
writer.writeheader() | |
writer.writerows(dormantChannels.items()) | |
except IOError: | |
print("I/O Error") | |
# Archive inactive channels listed in uploaded CSV | |
def archiveFromCSV(): | |
try: | |
with open('dormant_channels.csv', newline='') as csvfile: | |
reader = csv.reader(csvfile, delimiter=' ', quotechar='|') | |
channelsToArchive = [] | |
for row in reader: | |
channelsToArchive.append(row[0].split(',')[0]) | |
except IOError: | |
print("I/O Error") | |
for channelID in channelsToArchive: | |
try: | |
url = "https://slack.com/api/conversations.archive?channel=" + channelID | |
payload = {} | |
headers = { | |
'Authorization': 'Bearer ' + user_token | |
} | |
response = requests.request( | |
"POST", url, headers=headers, data=payload) | |
except: | |
continue | |
# Unarchive channels listed in CSV | |
def unarchiveFromCSV(): | |
try: | |
with open('dormant_channels.csv', newline='') as csvfile: | |
reader = csv.reader(csvfile, delimiter=' ', quotechar='|') | |
channelsToUnarchive = [] | |
for row in reader: | |
channelsToUnarchive.append(row[0].split(',')[0]) | |
except IOError: | |
print("I/O Error") | |
for channelID in channelsToUnarchive: | |
try: | |
url = "https://slack.com/api/conversations.unarchive?channel=" + channelID | |
payload = {} | |
headers = { | |
'Authorization': 'Bearer ' + user_token | |
} | |
response = requests.request( | |
"POST", url, headers=headers, data=payload) | |
except: | |
continue | |
# Main function | |
def main(): | |
print("What would you like to do?") | |
print('0. Quit this script') | |
print("1. Generate a CSV of inactive channels") | |
print("2. Archive channels in CSV") | |
print("3. Unarchive channels in CSV") | |
menuChoice = input("Pick a number:") | |
if menuChoice == "0": | |
sys.exit() | |
if menuChoice == "1": | |
makeCsv = csvOfDormantChannels() | |
print("CSV complete") | |
sys.exit() | |
if menuChoice == "2": | |
archiveFromCSV() | |
print("Channels have been archived") | |
sys.exit() | |
if menuChoice == "3": | |
unarchiveFromCSV() | |
print("Channels have been unarchived") | |
sys.exit() | |
else: | |
print("That's not a valid choice") | |
# Run script | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment