-
-
Save bowbahdoe/ce961e6c7449c1ffe9877082465b1f47 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; ---------------------------------------------------------------------------- | |
(defn messages-since | |
"returns a max `n` messages since the given instant." | |
[{:keys [db]} {:keys [since n group-id] | |
:or {since (Instant/ofEpochMilli 0) | |
n Integer/MAX_VALUE}}] | |
(jdbc/execute! | |
db | |
(db/expand-named-parameters-compile-time | |
"SELECT * FROM \"group\" | |
WHERE message.created_at > :since | |
ORDER BY message.created_at ASC | |
JOIN message ON message.to_group_id = :group-id | |
LIMIT :n" | |
{:since since | |
:n n | |
:group-id group-id}))) | |
;; ---------------------------------------------------------------------------- | |
(defn message-key | |
"The key under which messages between users are published." | |
[from-id to-id] | |
(String/format "message_from_page_%s_to_group_%s" | |
(into-array [(str from-id) (str to-id)]))) | |
;; ---------------------------------------------------------------------------- | |
(defn parse-message-key | |
[message-key] | |
(let [[_ _ _ from-page _ _ to-group] (string/split message-key #"_") | |
from-page-id (utils/->int from-page) | |
to-group-id (utils/->int to-group)] | |
(when (and from-page-id to-group-id) | |
{:from-page-id from-page-id | |
:to-group-id to-group-id}))) | |
;; ---------------------------------------------------------------------------- | |
(defn send-message! | |
"Sends a message to the given user. Goes in a round-trip through redis." | |
[{:keys [db redis-client-pool]} from-page to-page contents] | |
(jdbc/with-transaction [transaction db] | |
(let [group (group/find-or-create! transaction [(:page/id from-page) (:page/id to-page)]) | |
message (sql/insert! transaction "message" {:from_page_id (:page/id from-page) | |
:to_group_id (:group/id group) | |
:contents contents})] | |
(redis/with-client [client redis-client-pool] | |
(.publish client | |
(message-key (:page/id from-page) (:group/id group)) | |
(utils/->transit message))) | |
message))) | |
;; ---------------------------------------------------------------------------- | |
(defn- chat-messages-listener | |
"Creates a jedis pub sub instance that handles any received messages | |
and shuffles them off to users." | |
[db connections] | |
(proxy [JedisPubSub] [] | |
(onPMessage [_pattern channel message] | |
(log/info ::event :received-message | |
::channel channel | |
::between (parse-message-key channel)) | |
(let [{:keys [to-group-id]} (parse-message-key channel) | |
pages (->> (group/find-by-id db to-group-id) | |
(:group/participating_page_ids) | |
(mapv #(page/by-id db %))) | |
active-connections @connections | |
user-ids (map :page/user_id pages) | |
callbacks (mapcat active-connections user-ids)] | |
(doseq [callback callbacks] | |
(when callback | |
(callback (utils/<-transit message)))))))) | |
;; ---------------------------------------------------------------------------- | |
(defn create-chat-subsystem | |
"Creates an object that holds the info required to manage | |
the chat subsystem, including sending notifications to | |
users when messages are sent." | |
[db ^JedisPool redis-client-pool] | |
(let [;; Map of user-id to callbacks to call when a | |
;; new message comes through for them. | |
connections (atom {}) | |
subsystem {::connections connections | |
;; Objects needed to manage subscribing to redis | |
;; for messages posted on other nodes. | |
::redis-client (.getResource redis-client-pool) | |
::redis-pub-sub (chat-messages-listener db connections) | |
::subscription-executor (Executors/newSingleThreadExecutor | |
(-> (BasicThreadFactory$Builder.) | |
(.namingPattern "chat-subsystem-%s") | |
(.build)))}] | |
(.submit (::subscription-executor subsystem) | |
(reify Runnable | |
(run [_] | |
(utils/restart-on-failure | |
(.psubscribe (::redis-client subsystem) | |
(::redis-pub-sub subsystem) | |
(into-array [(message-key "*" "*")])))))) | |
subsystem)) | |
;; ---------------------------------------------------------------------------- | |
(defn shutdown-chat-subsystem! [chat-subsystem] | |
(log/info ::shutdown-step "Unsubscribing from channels.") | |
(.punsubscribe (::redis-pub-sub chat-subsystem)) | |
(log/info ::shutdown-step "Returning redis client to the pool.") | |
(.close (::redis-client chat-subsystem)) | |
(log/info ::shutdown-step "Shutting down the executor") | |
(.shutdownNow (::subscription-executor chat-subsystem))) | |
;; ---------------------------------------------------------------------------- | |
(defn attach-user-session! [chat-subsystem user-id callback] | |
(swap! (::connections chat-subsystem) | |
(fn [users] | |
(update users user-id conj callback)))) | |
;; ---------------------------------------------------------------------------- | |
(defn remove-user-session! | |
[chat-subsystem user-id callback] | |
(swap! (::connections chat-subsystem) | |
(fn [users] | |
(let [new-callbacks-for-user (remove #{callback} (users user-id))] | |
(if (empty? new-callbacks-for-user) | |
(dissoc users user-id) | |
(assoc users user-id new-callbacks-for-user)))))) | |
;; ---------------------------------------------------------------------------- | |
(defn user-messages-streamer | |
[ctx _args source-stream] | |
(let [{:keys [user]} ctx | |
{:keys [chat-subsystem]} (:system ctx)] | |
(log/info ::event :user-subscribing-to-messages | |
::user-id (:user/id user)) | |
(attach-user-session! chat-subsystem (:user/id user) source-stream) | |
#(remove-user-session! chat-subsystem (:user/id user) source-stream))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment