Skip to content

Instantly share code, notes, and snippets.

@cunla
Last active April 5, 2022 14:26
Show Gist options
  • Save cunla/8253a4595de1a6b721d65b9e2139c74b to your computer and use it in GitHub Desktop.
Save cunla/8253a4595de1a6b721d65b9e2139c74b to your computer and use it in GitHub Desktop.
Crawl slack channel for threads and and all msgs in thread.
from datetime import datetime
import json
import logging
import os
from collections import namedtuple
from typing import Dict, Union, Tuple, List
from dotenv import load_dotenv, find_dotenv
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
load_dotenv(find_dotenv())
SLACK_BOT_TOKEN = os.getenv('SLACK_BOT_TOKEN', None)
SLACK_NOTIFICATIONS_CHANNEL = os.getenv('SLACK_NOTIFICATIONS_CHANNEL', None)
SLACK_SIGNING_SECRET_KEY = os.getenv('SLACK_SIGNING_SECRET_KEY', None)
logger = logging.getLogger(__package__)
logging.basicConfig(level=logging.INFO)
SlackUserInfo = namedtuple('SlackUserInfo', ['name', 'username', 'email', 'image_url', ])
MessageInfo = namedtuple('MessageInfo', ['user', 'ts', 'text'])
class SlackCrawler(object):
def __init__(self, token: str):
self.client = WebClient(SLACK_BOT_TOKEN)
self.threads_list = None
self.channel_map = None
self._scan_channels()
def _scan_channels(self) -> Dict[str, str]:
"""
Scan all channels that can be read and returns a dictionary from
channel name to its id.
Requires permissions (https://api.slack.com/messaging/retrieving):
- channels:read
- channels:history
Returns:
Dictionary Channel name => Channel ID.
"""
try:
response = self.client.conversations_list()
self.channel_map = {
channel["name"]: channel["id"] for channel in response["channels"]
}
return self.channel_map
except SlackApiError as e:
logger.error("Error fetching conversations: {}".format(e))
raise e
def _channel_history(self, channel_name: str):
# https://api.slack.com/methods/conversations.history$pagination
channel_id = self.channel_map[channel_name]
logger.debug(f'Scanning history for {channel_name} (id={channel_id})')
cursor = None
threads_list = list()
try:
while True:
result = self.client.conversations_history(channel=channel_id, cursor=cursor)
logger.debug(f"{len(result['messages'])} messages found in #{channel_name}")
threads_list.extend([
item['thread_ts']
for item in result['messages']
if 'thread_ts' in item
])
cursor = result.get('response_metadata', dict()).get('next_cursor', None)
if not cursor:
break
logger.debug(f"{len(threads_list)} threads found in #{channel_name}")
return threads_list
except SlackApiError as e:
logger.error(f"Error getting channel history for #{channel_name}: {e}")
raise e
def _analyze_thread(self, channel_name: str, thread_ts: str):
channel_id = self.channel_map[channel_name]
try:
res = self.client.conversations_replies(channel=channel_id, ts=thread_ts)
return res['messages']
except SlackApiError as e:
logger.error(f"Error getting channel history for #{channel_name}: {e}")
raise e
def _get_user_info(self, slack_user_id: str) -> Union[SlackUserInfo, None]:
"""
requires: users:read.email, users:read
"""
try:
response = self.client.users_info(user=slack_user_id)
if not response['ok']:
logger.warning(f"Couldn't find slack user {slack_user_id}, slack error: {response['error']}")
return None
profile = response.get('user', {}).get('profile', {})
return SlackUserInfo(
profile.get('real_name', None),
response.get('user', {}).get('name', None),
profile.get('email', None),
profile.get('image_original', None),
)
except SlackApiError as e:
logger.warning(f"Couldn't find slack user {slack_user_id}, slack error: {e.response['error']}")
return None
def channel_threads(self, channel_name: str) -> Tuple[Dict[str, SlackUserInfo], Dict[str, List[Tuple[str, str]]]]:
"""
Scan a channel for threads.
Returns:
Tuple[
Dict userId -> SlackUserInfo,
Dict thread_ts -> (userId, text)
]
"""
users_set = set()
threads_dict = dict()
threads_list = self._channel_history(channel_name)
for thread_ts in threads_list:
thread = self._analyze_thread(channel_name, thread_ts)
users_set.update(thread[0]['reply_users'])
users_set.add(thread[0]['user'])
threads_dict[thread_ts] = [
MessageInfo(i['user'],
datetime.fromtimestamp(int(float(i['ts']))).isoformat(),
i['text'])
for i in thread
]
logger.debug(f'Getting info for {len(users_set)} users')
users_dict = {
x: self._get_user_info(x)
for x in users_set
}
return users_dict, threads_dict
if __name__ == '__main__':
channel = 'general'
if SLACK_BOT_TOKEN is None:
logger.error("SLACK_BOT_TOKEN not set, exiting")
raise EnvironmentError(f"slack-client not initialized, can't scan")
crawler = SlackCrawler(SLACK_BOT_TOKEN)
users_dict, threads_dict = crawler.channel_threads(channel)
print(json.dumps(users_dict, indent=2))
print(json.dumps(threads_dict, indent=2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment