Skip to content

Instantly share code, notes, and snippets.

@KaustubhPatange
Created October 1, 2023 09:38
Show Gist options
  • Save KaustubhPatange/8216d7ceeb4d602dbfa5aa7bb33cbc10 to your computer and use it in GitHub Desktop.
Save KaustubhPatange/8216d7ceeb4d602dbfa5aa7bb33cbc10 to your computer and use it in GitHub Desktop.
A script to recursively delete Slack DM's messages and replies.
# A script to recursively delete Slack DM's messages and replies.
#
# Preparation:
# - Create a slack app and install it in your workspace or any channel
# - In OAuth & Permissions, add channels:read, chat:write, groups:read, im:read, incoming-webhook, mpim:read, users:read
# scopes to both Bot and User Token scopes, you might need to reinstall it in the workspace.
# - Get "User OAuth Token".
# - pip install requests==2.28.1 termcolor
import time
import requests
from termcolor import colored, cprint
USER_TOKEN = "YOUR_USER_OAUTH_TOKEN"
USER_ID = "YOUR_MEMBER_ID_OF_WORKSPACE"
include_channels = ["ONE_OR_MORE_MEMBER_ID_OF_USER_TO_DELETE_MESSSAGE"]
headers = {
"Content-Type": "application/www-form-urlencoded",
"Authorization": f"Bearer {USER_TOKEN}",
}
delete_count = 0
def delete_slack_message(ts, channel):
global delete_count
if delete_count > 10:
delete_count = 0
print("Sleeping for 10 seconds") # to avoid rate limit
time.sleep(10)
res = requests.post(
"https://slack.com/api/chat.delete",
params={"channel": channel, "ts": ts},
headers=headers,
)
res = res.json()
if not res["ok"]:
cprint(f'Error Deleting: {res["error"]}', color="red")
delete_count += 1
pass
channels = []
user_cursor = None
while True:
if user_cursor == "":
break
res = requests.post(
"https://slack.com/api/users.conversations",
params={"types": "im", "cursor": user_cursor},
headers=headers,
)
res = res.json()
if res.get("response_metadata", None):
user_cursor = res["response_metadata"]["next_cursor"]
else:
user_cursor = ""
for channel in res["channels"]:
if channel["user"] in include_channels:
channel_id = channel["id"]
res = requests.post(
"https://slack.com/api/users.info",
params={"user": channel["user"]},
headers=headers,
)
res = res.json()
channel = res["user"]
channel["channel_id"] = channel_id
print(channel["name"], channel["id"])
channels.append(channel)
time.sleep(0.2)
for channel in channels:
cprint(
f">> Traversing converstaion history for channel: {channel['name']} ({channel['id']})",
color="green",
)
cursor = "0"
while True:
if not cursor:
break
res = requests.post(
"https://slack.com/api/conversations.history",
params={"channel": channel["channel_id"], "cursor": cursor},
headers=headers,
)
res = res.json()
if not res["ok"]:
cprint(f'Error: {res["error"]}', color="red")
break
messages = res["messages"]
if res.get("response_metadata", None):
cursor = res["response_metadata"]["next_cursor"]
else:
cursor = ""
for message in messages:
if message.get("user", None) == USER_ID:
d1 = colored("Deleting message: ", color="blue")
d2 = colored(message["text"], color="yellow")
print(d1 + d2)
delete_slack_message(message["ts"], channel["channel_id"])
if message.get("reply_count", 0) > 0:
reply_cursor = None
while True:
if reply_cursor == "":
break
res = requests.post(
"https://slack.com/api/conversations.replies",
params={
"channel": channel["channel_id"],
"ts": message["ts"],
"cursor": reply_cursor,
},
headers=headers,
)
res = res.json()
if not res["ok"]:
cprint(f'Error: {res["error"]}', color="red")
break
replies = res["messages"]
if res.get("response_metadata", None):
reply_cursor = res["response_metadata"]["next_cursor"]
else:
reply_cursor = ""
for reply in replies:
if reply.get("user", None) == USER_ID:
d1 = colored(" Deleting reply: ", color="blue")
d2 = colored(reply["text"], color="yellow")
print(d1 + d2)
delete_slack_message(reply["ts"], channel["channel_id"])
time.sleep(1) # to avoid rate limit
time.sleep(1) # to avoid rate limit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment