Skip to content

Instantly share code, notes, and snippets.

@bowbahdoe
Created May 28, 2021 15:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bowbahdoe/ce961e6c7449c1ffe9877082465b1f47 to your computer and use it in GitHub Desktop.
Save bowbahdoe/ce961e6c7449c1ffe9877082465b1f47 to your computer and use it in GitHub Desktop.
;; ----------------------------------------------------------------------------
(defn messages-since
"returns a max `n` messages since the given instant."
[{:keys [db]} {:keys [since n group-id]
:or {since (Instant/ofEpochMilli 0)
n Integer/MAX_VALUE}}]
(jdbc/execute!
db
(db/expand-named-parameters-compile-time
"SELECT * FROM \"group\"
WHERE message.created_at > :since
ORDER BY message.created_at ASC
JOIN message ON message.to_group_id = :group-id
LIMIT :n"
{:since since
:n n
:group-id group-id})))
;; ----------------------------------------------------------------------------
(defn message-key
"The key under which messages between users are published."
[from-id to-id]
(String/format "message_from_page_%s_to_group_%s"
(into-array [(str from-id) (str to-id)])))
;; ----------------------------------------------------------------------------
(defn parse-message-key
[message-key]
(let [[_ _ _ from-page _ _ to-group] (string/split message-key #"_")
from-page-id (utils/->int from-page)
to-group-id (utils/->int to-group)]
(when (and from-page-id to-group-id)
{:from-page-id from-page-id
:to-group-id to-group-id})))
;; ----------------------------------------------------------------------------
(defn send-message!
"Sends a message to the given user. Goes in a round-trip through redis."
[{:keys [db redis-client-pool]} from-page to-page contents]
(jdbc/with-transaction [transaction db]
(let [group (group/find-or-create! transaction [(:page/id from-page) (:page/id to-page)])
message (sql/insert! transaction "message" {:from_page_id (:page/id from-page)
:to_group_id (:group/id group)
:contents contents})]
(redis/with-client [client redis-client-pool]
(.publish client
(message-key (:page/id from-page) (:group/id group))
(utils/->transit message)))
message)))
;; ----------------------------------------------------------------------------
(defn- chat-messages-listener
"Creates a jedis pub sub instance that handles any received messages
and shuffles them off to users."
[db connections]
(proxy [JedisPubSub] []
(onPMessage [_pattern channel message]
(log/info ::event :received-message
::channel channel
::between (parse-message-key channel))
(let [{:keys [to-group-id]} (parse-message-key channel)
pages (->> (group/find-by-id db to-group-id)
(:group/participating_page_ids)
(mapv #(page/by-id db %)))
active-connections @connections
user-ids (map :page/user_id pages)
callbacks (mapcat active-connections user-ids)]
(doseq [callback callbacks]
(when callback
(callback (utils/<-transit message))))))))
;; ----------------------------------------------------------------------------
(defn create-chat-subsystem
"Creates an object that holds the info required to manage
the chat subsystem, including sending notifications to
users when messages are sent."
[db ^JedisPool redis-client-pool]
(let [;; Map of user-id to callbacks to call when a
;; new message comes through for them.
connections (atom {})
subsystem {::connections connections
;; Objects needed to manage subscribing to redis
;; for messages posted on other nodes.
::redis-client (.getResource redis-client-pool)
::redis-pub-sub (chat-messages-listener db connections)
::subscription-executor (Executors/newSingleThreadExecutor
(-> (BasicThreadFactory$Builder.)
(.namingPattern "chat-subsystem-%s")
(.build)))}]
(.submit (::subscription-executor subsystem)
(reify Runnable
(run [_]
(utils/restart-on-failure
(.psubscribe (::redis-client subsystem)
(::redis-pub-sub subsystem)
(into-array [(message-key "*" "*")]))))))
subsystem))
;; ----------------------------------------------------------------------------
(defn shutdown-chat-subsystem! [chat-subsystem]
(log/info ::shutdown-step "Unsubscribing from channels.")
(.punsubscribe (::redis-pub-sub chat-subsystem))
(log/info ::shutdown-step "Returning redis client to the pool.")
(.close (::redis-client chat-subsystem))
(log/info ::shutdown-step "Shutting down the executor")
(.shutdownNow (::subscription-executor chat-subsystem)))
;; ----------------------------------------------------------------------------
(defn attach-user-session! [chat-subsystem user-id callback]
(swap! (::connections chat-subsystem)
(fn [users]
(update users user-id conj callback))))
;; ----------------------------------------------------------------------------
(defn remove-user-session!
[chat-subsystem user-id callback]
(swap! (::connections chat-subsystem)
(fn [users]
(let [new-callbacks-for-user (remove #{callback} (users user-id))]
(if (empty? new-callbacks-for-user)
(dissoc users user-id)
(assoc users user-id new-callbacks-for-user))))))
;; ----------------------------------------------------------------------------
(defn user-messages-streamer
[ctx _args source-stream]
(let [{:keys [user]} ctx
{:keys [chat-subsystem]} (:system ctx)]
(log/info ::event :user-subscribing-to-messages
::user-id (:user/id user))
(attach-user-session! chat-subsystem (:user/id user) source-stream)
#(remove-user-session! chat-subsystem (:user/id user) source-stream)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment