Last active
April 5, 2022 14:26
-
-
Save cunla/8253a4595de1a6b721d65b9e2139c74b to your computer and use it in GitHub Desktop.
Crawl slack channel for threads and and all msgs in thread.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import json | |
import logging | |
import os | |
from collections import namedtuple | |
from typing import Dict, Union, Tuple, List | |
from dotenv import load_dotenv, find_dotenv | |
from slack_sdk import WebClient | |
from slack_sdk.errors import SlackApiError | |
load_dotenv(find_dotenv()) | |
SLACK_BOT_TOKEN = os.getenv('SLACK_BOT_TOKEN', None) | |
SLACK_NOTIFICATIONS_CHANNEL = os.getenv('SLACK_NOTIFICATIONS_CHANNEL', None) | |
SLACK_SIGNING_SECRET_KEY = os.getenv('SLACK_SIGNING_SECRET_KEY', None) | |
logger = logging.getLogger(__package__) | |
logging.basicConfig(level=logging.INFO) | |
SlackUserInfo = namedtuple('SlackUserInfo', ['name', 'username', 'email', 'image_url', ]) | |
MessageInfo = namedtuple('MessageInfo', ['user', 'ts', 'text']) | |
class SlackCrawler(object): | |
def __init__(self, token: str): | |
self.client = WebClient(SLACK_BOT_TOKEN) | |
self.threads_list = None | |
self.channel_map = None | |
self._scan_channels() | |
def _scan_channels(self) -> Dict[str, str]: | |
""" | |
Scan all channels that can be read and returns a dictionary from | |
channel name to its id. | |
Requires permissions (https://api.slack.com/messaging/retrieving): | |
- channels:read | |
- channels:history | |
Returns: | |
Dictionary Channel name => Channel ID. | |
""" | |
try: | |
response = self.client.conversations_list() | |
self.channel_map = { | |
channel["name"]: channel["id"] for channel in response["channels"] | |
} | |
return self.channel_map | |
except SlackApiError as e: | |
logger.error("Error fetching conversations: {}".format(e)) | |
raise e | |
def _channel_history(self, channel_name: str): | |
# https://api.slack.com/methods/conversations.history$pagination | |
channel_id = self.channel_map[channel_name] | |
logger.debug(f'Scanning history for {channel_name} (id={channel_id})') | |
cursor = None | |
threads_list = list() | |
try: | |
while True: | |
result = self.client.conversations_history(channel=channel_id, cursor=cursor) | |
logger.debug(f"{len(result['messages'])} messages found in #{channel_name}") | |
threads_list.extend([ | |
item['thread_ts'] | |
for item in result['messages'] | |
if 'thread_ts' in item | |
]) | |
cursor = result.get('response_metadata', dict()).get('next_cursor', None) | |
if not cursor: | |
break | |
logger.debug(f"{len(threads_list)} threads found in #{channel_name}") | |
return threads_list | |
except SlackApiError as e: | |
logger.error(f"Error getting channel history for #{channel_name}: {e}") | |
raise e | |
def _analyze_thread(self, channel_name: str, thread_ts: str): | |
channel_id = self.channel_map[channel_name] | |
try: | |
res = self.client.conversations_replies(channel=channel_id, ts=thread_ts) | |
return res['messages'] | |
except SlackApiError as e: | |
logger.error(f"Error getting channel history for #{channel_name}: {e}") | |
raise e | |
def _get_user_info(self, slack_user_id: str) -> Union[SlackUserInfo, None]: | |
""" | |
requires: users:read.email, users:read | |
""" | |
try: | |
response = self.client.users_info(user=slack_user_id) | |
if not response['ok']: | |
logger.warning(f"Couldn't find slack user {slack_user_id}, slack error: {response['error']}") | |
return None | |
profile = response.get('user', {}).get('profile', {}) | |
return SlackUserInfo( | |
profile.get('real_name', None), | |
response.get('user', {}).get('name', None), | |
profile.get('email', None), | |
profile.get('image_original', None), | |
) | |
except SlackApiError as e: | |
logger.warning(f"Couldn't find slack user {slack_user_id}, slack error: {e.response['error']}") | |
return None | |
def channel_threads(self, channel_name: str) -> Tuple[Dict[str, SlackUserInfo], Dict[str, List[Tuple[str, str]]]]: | |
""" | |
Scan a channel for threads. | |
Returns: | |
Tuple[ | |
Dict userId -> SlackUserInfo, | |
Dict thread_ts -> (userId, text) | |
] | |
""" | |
users_set = set() | |
threads_dict = dict() | |
threads_list = self._channel_history(channel_name) | |
for thread_ts in threads_list: | |
thread = self._analyze_thread(channel_name, thread_ts) | |
users_set.update(thread[0]['reply_users']) | |
users_set.add(thread[0]['user']) | |
threads_dict[thread_ts] = [ | |
MessageInfo(i['user'], | |
datetime.fromtimestamp(int(float(i['ts']))).isoformat(), | |
i['text']) | |
for i in thread | |
] | |
logger.debug(f'Getting info for {len(users_set)} users') | |
users_dict = { | |
x: self._get_user_info(x) | |
for x in users_set | |
} | |
return users_dict, threads_dict | |
if __name__ == '__main__': | |
channel = 'general' | |
if SLACK_BOT_TOKEN is None: | |
logger.error("SLACK_BOT_TOKEN not set, exiting") | |
raise EnvironmentError(f"slack-client not initialized, can't scan") | |
crawler = SlackCrawler(SLACK_BOT_TOKEN) | |
users_dict, threads_dict = crawler.channel_threads(channel) | |
print(json.dumps(users_dict, indent=2)) | |
print(json.dumps(threads_dict, indent=2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment