Skip to content

Instantly share code, notes, and snippets.

@martinohanlon
Created November 15, 2017 17:14
Show Gist options
  • Save martinohanlon/477b6ea4c3bdc679ddff92dfc3bff4a7 to your computer and use it in GitHub Desktop.
Save martinohanlon/477b6ea4c3bdc679ddff92dfc3bff4a7 to your computer and use it in GitHub Desktop.
Slack command line streamer
# A terminal slack streamer
# pre-requisites
# - colorama : pip install colorama
# - slackclient : pip install slackclient
# - a legacy slack api token : https://api.slack.com/custom-integrations/legacy-tokens
from slackclient import SlackClient
from colorama import init, Fore, Back
import time
import os
# init colorama
init()
# get a legacy api token from slack and put it in an environment variable
slack_token = os.getenv("SLACK_API_TOKEN")
# cache user details rather than use the api everytime
users = {}
def get_user_info(sc, user_id):
if user_id in users.keys():
return users[user_id]
else:
user_info_return = sc.api_call("users.info", user = user_id)
if user_info_return["ok"] == True:
users[user_id] = user_info_return["user"]
return user_info_return["user"]
# cache channels
channels = {}
def get_channel_info(sc, channel_id):
if channel_id in channels.keys():
return channels[channel_id]
else:
channel_info_return = sc.api_call("channels.info", channel = channel_id)
if channel_info_return["ok"] == True:
channels[channel_id] = channel_info_return["channel"]
return channel_info_return["channel"]
# open up slack
sc = SlackClient(slack_token)
print(Fore.GREEN + "Connecting to slack")
if sc.rtm_connect():
while True:
#get any new data from slack
data_lines = sc.rtm_read()
for data in data_lines:
#print(data)
#if the data received is a message stick in on the screen
if data["type"] == "message":
what_channel = Back.BLUE + Fore.WHITE + "#" + get_channel_info(sc, data["channel"])["name"]
what_user = Back.BLACK + Fore.RED + " " + get_user_info(sc, data["user"])["name"] + " "
message = Back.BLACK + Fore.WHITE + "'" + data["text"] + "'"
print(what_channel + what_user + message)
time.sleep(1)
else:
print("Connection Failed")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment