|
# Usage: |
|
# SlackRtmApiSandbox.start_link({"YOUR_BOT_TOKEN", "your channel id to notify"}) |
|
# or, |
|
# Supervisor.start_link([SlackRtmApiSandbox.child_spec({"YOUR_BOT_TOKEN", "your channel id to notify"})], strategy: :one_for_one) |
|
defmodule SlackRtmApiSandbox do |
|
use GenServer |
|
require Logger |
|
|
|
defmodule Config do |
|
@enforce_keys [:token, :channel_id_to_notify] |
|
defstruct [:token, :channel_id_to_notify] |
|
end |
|
|
|
# https://api.slack.com/methods/rtm.connect |
|
def get_rtm_connect(token) do |
|
{:ok, conn_pid} = :gun.open('slack.com', 443) |
|
stream_ref = :gun.get(conn_pid, '/api/rtm.connect?token=#{token}') |
|
{:response, :nofin, _status, _headers} = :gun.await(conn_pid, stream_ref) |
|
{:ok, body} = :gun.await_body(conn_pid, stream_ref) |
|
:gun.shutdown(conn_pid) |
|
:gun.flush(conn_pid) |
|
# https://api.slack.com/methods/rtm.connect#response |
|
Jason.decode!(body) |
|
end |
|
|
|
# https://api.slack.com/rtm |
|
def connect_to_websocket(url) do |
|
%URI{scheme: "wss", host: host, path: path} = URI.parse(url) |
|
{:ok, conn_pid} = :gun.open(String.to_charlist(host), 443) |
|
{:ok, :http} = :gun.await_up(conn_pid) |
|
:gun.ws_upgrade(conn_pid, String.to_charlist(path)) |
|
conn_pid |
|
end |
|
|
|
def start_link({token, channel_id_to_notify}) do |
|
GenServer.start_link(__MODULE__, {token, channel_id_to_notify}) |
|
end |
|
|
|
def init({token, channel_id_to_notify}) do |
|
send(self(), :initialize) |
|
{:ok, {:initializing, %Config{token: token, channel_id_to_notify: channel_id_to_notify}}} |
|
end |
|
|
|
def handle_call(:messages, _from, {:connected, config, self, team, conn, monitor_ref, messages}) do |
|
reply = Enum.map(messages, &(elem(&1, 1))) |
|
|> Enum.map(&Jason.decode!/1) |
|
|> Enum.reverse |
|
{:reply, reply, {:connected, config, self, team, conn, monitor_ref, []}} |
|
end |
|
|
|
def handle_cast({:send_text, text}, {:connected, _, _, _, conn, _, _} = state) do |
|
:ok = :gun.ws_send(conn, {:text, text}) |
|
{:noreply, state} |
|
end |
|
|
|
def handle_cast({:send_text, text}, state) do |
|
Process.send_after(self(), {:send_text, text}, 1_000) |
|
{:noreply, state} |
|
end |
|
|
|
def handle_info(:initialize, {:initializing, config}) do |
|
# The name of `self` has been already used by `Kernel.self/0`. |
|
# So we should use an alternate name as variable to avoid failing by naming confliction. |
|
%{"ok" => true, "self" => me, "team" => team, "url" => url} = get_rtm_connect(String.to_charlist(config.token)) |
|
Logger.info("get_rtm_connect in the initialize: self: #{inspect me}, team: #{inspect team}") |
|
conn_pid = connect_to_websocket(url) |
|
Logger.info("connect_to_websocket in the initialize: conn_pid: #{inspect conn_pid}") |
|
monitor_ref = Process.monitor(conn_pid) |
|
{:noreply, {:connecting, config, me, team, conn_pid, monitor_ref, []}, 3_000} |
|
end |
|
|
|
def handle_info({:gun_ws_upgrade, _, :ok, _}, {:connecting, config, me, team, conn, monitor_ref, messages}) do |
|
schedule_work() |
|
{:noreply, {:connected, config, me, team, conn, monitor_ref, messages}} |
|
end |
|
|
|
def handle_info({:gun_response, _, _, _, status, headers}, state) do |
|
reason = {status, headers} |
|
{:stop, {:ws_upgrade_failed, reason}, state} |
|
end |
|
|
|
def handle_info({:gun_error, _, _, reason}, state) do |
|
{:stop, {:gun_error, reason}, state} |
|
end |
|
|
|
def handle_info({:DOWN, _, _, _, reason}, state) do |
|
{:stop, {:DOWN, reason}, state} |
|
end |
|
|
|
def handle_info({:gun_ws, _, {:text, frame}}, {status, config, me, team, conn, monitor_ref, messages}) do |
|
decoded = Jason.decode!(frame) |
|
Logger.info("An event received, status: #{status}, frame: #{inspect decoded}") |
|
# https://api.slack.com/rtm#events |
|
case decoded["type"] do |
|
"message" -> |
|
# https://api.slack.com/events/message |
|
case decoded["subtype"] do |
|
# normal message doesn't have subtype |
|
nil -> |
|
# handle a mention |
|
if String.contains?(decoded["text"], "<@#{me["id"]}>") do |
|
tokens = |
|
String.replace(decoded["text"], "<@#{me["id"]}>", "") # remove mention part |
|
|> String.split # split tokens |
|
|> Enum.map(&String.trim/1) # trim tokens |
|
cond do |
|
Enum.empty?(tokens) -> |
|
# callback mention to the caller |
|
json = Jason.encode!(%{ |
|
id: System.os_time(), |
|
type: "message", |
|
channel: decoded["channel"], |
|
text: "<@#{decoded["user"]}>" |
|
}) |
|
Logger.debug("Sending mention to the caller: #{inspect json}") |
|
GenServer.cast(self(), {:send_text, json}) |
|
Enum.all?(tokens, &(Regex.match?(~r/<@\w+>/, &1))) -> |
|
# All tokens are replesentation of users. |
|
user_id = Enum.random(tokens) |
|
json = Jason.encode!(%{ |
|
id: System.os_time(), |
|
type: "message", |
|
channel: decoded["channel"], |
|
text: user_id |
|
}) |
|
Logger.debug("Sending mention to one of the users randomly: #{inspect json}, in #{inspect tokens}") |
|
GenServer.cast(self(), {:send_text, json}) |
|
true -> |
|
Logger.debug(inspect tokens) |
|
end |
|
end |
|
_ -> |
|
nil # Catch all submessages but DO NOTHING |
|
end |
|
"channel_created" -> |
|
# https://api.slack.com/events/channel_created |
|
json = Jason.encode!(%{ |
|
id: System.os_time(), |
|
type: "message", |
|
channel: config.channel_id_to_notify, |
|
text: "Channel created: <##{decoded["channel"]["id"]}>" |
|
}) |
|
GenServer.cast(self(), {:send_text, json}) |
|
_ -> |
|
nil # Catch all events but DO NOTHING |
|
end |
|
{:noreply, {status, config, me, team, conn, monitor_ref, [decoded | messages]}} |
|
end |
|
|
|
def handle_info({:send_text, text}, {:connected, _, _, _, conn, _, _} = state) do |
|
:ok = :gun.ws_send(conn, {:text, text}) |
|
{:noreply, state} |
|
end |
|
|
|
def handle_info({:send_text, text}, state) do |
|
Process.send_after(self(), {:send_text, text}, 1_000) |
|
{:noreply, state} |
|
end |
|
|
|
def handle_info(:do_ping, state) do |
|
# https://api.slack.com/rtm#ping_and_pong |
|
Logger.debug("Sending ping") |
|
schedule_work() |
|
json = Jason.encode!(%{ |
|
id: System.os_time(), |
|
type: "ping" |
|
}) |
|
GenServer.cast(self(), {:send_text, json}) |
|
{:noreply, state} |
|
end |
|
|
|
# catch all unexpected messages |
|
def handle_info(message, state) do |
|
Logger.warn("Unexpected message received, status: #{inspect elem(state, 0)}, message: #{inspect message}") |
|
{:noreply, state} |
|
end |
|
|
|
def terminate(reason, {:initializing, _}) do |
|
{:shutdown, {:initializing, reason}} |
|
end |
|
|
|
def terminate(reason, {status, _, _, _, conn, monitor_ref, _}) do |
|
Process.demonitor(monitor_ref) |
|
:gun.close(conn) |
|
{:shutdown, {status, reason}} |
|
end |
|
|
|
defp schedule_work() do |
|
# We know this implementation is a bit silly. |
|
# The client pings to server as fixed interval even if any other communication have between the client and the server. |
|
# Of cource we can implement more smarter one, but the code gets more complex. |
|
# To keep the code simple, I think the code is OK. |
|
# If you don't think so or think we can choose better one, feel free to let us know. |
|
Process.send_after(self(), :do_ping, 60 * 1000) # In 60 seconds |
|
end |
|
end |