Skip to content

Instantly share code, notes, and snippets.

@Terrance
Last active June 20, 2018 18:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Terrance/431fb0be5b97ae18e380c0fc4d0aecd2 to your computer and use it in GitHub Desktop.
Save Terrance/431fb0be5b97ae18e380c0fc4d0aecd2 to your computer and use it in GitHub Desktop.
A minimal asyncio client for the Slack real-time messaging (RTM) APIs.
import aiohttp
import asyncio
import logging
class SlackAPIError(Exception): pass
class Slack(object):
"""
A tiny async Slack client for the RTM APIs.
"""
def __init__(self, token, log=logging.getLogger(__name__)):
self.token = token
self.log = log
self.sess = aiohttp.ClientSession()
self.team = self.users = self.channels = self.directs = None
# When we send messages asynchronously, we'll receive an RTM event before the HTTP request
# returns. This lock will block event parsing whilst we're sending, to make sure the caller
# can finish processing the new message (e.g. storing the ID) before receiving the event.
self.lock = asyncio.BoundedSemaphore()
self.callbacks = []
@asyncio.coroutine
def msg(self, **kwargs):
self.log.debug("Sending message")
with (yield from self.lock):
# Block event processing whilst we wait for the message to go through. Processing will
# resume once the caller yields or returns.
resp = yield from self.sess.post("https://slack.com/api/chat.postMessage",
data=dict(kwargs, token=self.token))
json = yield from resp.json()
if not json["ok"]:
raise SlackAPIError(json["error"])
return json
@asyncio.coroutine
def rtm(self):
self.log.debug("Requesting RTM session")
resp = yield from self.sess.post("https://slack.com/api/rtm.start",
data={"token": self.token})
json = yield from resp.json()
if not json["ok"]:
raise SlackAPIError(json["error"])
# Cache useful information about users and channels, to save on queries later.
self.team = json["team"]
self.users = {u["id"]: u for u in json["users"]}
self.log.debug("Users ({}): {}".format(len(self.users), self.users.keys()))
self.channels = {c["id"]: c for c in json["channels"] + json["groups"]}
self.log.debug("Channels ({}): {}".format(len(self.channels), self.channels.keys()))
self.directs = {c["id"]: c for c in json["ims"]}
self.log.debug("Directs ({}): {}".format(len(self.directs), self.directs.keys()))
sock = yield from self.sess.ws_connect(json["url"])
self.log.debug("Connected to websocket")
while True:
event = yield from sock.receive_json()
with (yield from self.lock):
# No critical section here, just wait for any pending messages to be sent.
pass
if "type" not in event:
self.log.warn("Received strange message with no type")
continue
self.log.debug("Received a '{}' event".format(event["type"]))
if event["type"] in ("team_join", "user_change"):
# A user appears or changed, update our cache.
self.users[event["user"]["id"]] = event["user"]
elif event["type"] in ("channel_joined", "group_joined"):
# A group or channel appeared, add to our cache.
self.channels[event["channel"]["id"]] = event["channel"]
elif event["type"] == "im_created":
# A DM appeared, add to our cache.
self.directs[event["channel"]["id"]] = event["channel"]
try:
for callback in self.callbacks:
yield from callback(event)
except Exception:
self.log.exception("Failed callback for event")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment