-
-
Save jaen/cc8600e682eb65ec4a55 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns thesis.backend.components.async | |
(:require [com.stuartsierra.component :as component] | |
[taoensso.timbre :as log] | |
[taoensso.sente :as sente] | |
[taoensso.sente.server-adapters.immutant :as sente-immutant] | |
[taoensso.sente.packers.transit :as sente-transit] | |
[clojure.core.match :refer [match]] | |
[cuerdas.core :as str] | |
[clojure.core.async :as async] | |
[thesis.common.utils.transit :as transit-utils] | |
[thesis.backend.components.protocols :as protocols]) | |
(:import [clojure.lang MultiFn])) | |
(defn make-dispatch-method [] | |
(new MultiFn "scrabble.backend.components.async/async-dispatch-method" | |
:event | |
:default | |
#'clojure.core/global-hierarchy)) | |
(defn- make-request-map [{:as ev-msg :keys [uid client-id id ?data ?reply-fn]}] | |
(log/debug "Event:" uid ?data) | |
(let [{:keys [payload message-id]} ?data | |
;[valid? {{user-id :user} :iss}] (authentication/verify-token uid) | |
user-id (when-not (str/starts-with? (str uid) "tempid-") uid) | |
_ (log/debug "client id" user-id) | |
current-user nil #_(when user-id (future (users/find-by {:id user-id}))) | |
request (merge {:ev-msg ev-msg | |
:event id | |
:client-id client-id | |
:user-id uid | |
:message-id message-id | |
:payload payload} | |
(when user-id {:current-user current-user}))] | |
request)) | |
(defn make-event-handler [dispatch-fn] | |
(fn [{:keys [event send-fn] :as ev-msg}] | |
(let [[id data :as ev] event] | |
;(log/debug "Ev-msg:" ev-msg) | |
(log/debug "Event:" event) | |
(log/debug "Event id:" id ) | |
(match event ; [id data] | |
[:chsk/handshake _] (log/debug "Got handshake!") | |
[:chsk/state {:first-open? true}] (log/debug "Channel socket successfully established!") | |
[:chsk/state new-state] (log/debug "Chsk state change:" new-state) | |
[:chsk/recv payload] (dispatch-fn (make-request-map ev-msg)) | |
;[:chsk/uidport-open] | |
;[:chsk/uidport-close] | |
#_[:chsk/recv payload] #_(do | |
(log/debug "Push event from server") | |
(base-handler ev-msg)) | |
:else (dispatch-fn (make-request-map ev-msg)))))) | |
(defn- start-async! [dispatch] | |
(let [user-id-fn (fn [request] | |
(:client-id request) | |
#_(let [client-id (:client-id request) | |
[valid? token] (authentication/verify-token client-id) | |
user-id (get-in token [:iss :user :id])] | |
(log/info "Async user id is:" user-id) | |
(if valid? | |
user-id | |
client-id))) | |
sente-packer (sente-transit/->TransitPacker :json {:handlers transit-utils/transit-write-handlers} | |
{:handlers transit-utils/transit-read-handlers}) | |
sente-socket (sente/make-channel-socket! sente-immutant/immutant-adapter {:user-id-fn user-id-fn | |
:packer sente-packer}) | |
receive-chan (:ch-recv sente-socket) | |
sente-send! (:send-fn sente-socket) | |
connected-uids (:connected-uids sente-socket) | |
handle-receive (make-event-handler dispatch) | |
sente-router-stop! (sente/start-chsk-router! receive-chan handle-receive) | |
send-fn (fn send! | |
([uuid event payload] | |
(send! uuid event payload nil)) | |
([uuid event payload metadata] | |
(let [payload (merge {:payload payload} | |
(when metadata | |
metadata))] | |
(log/debug "sending payload:" payload) | |
(sente-send! uuid [event payload])))) | |
stop-fn (fn [] | |
(sente-router-stop!) | |
(async/close! receive-chan))] | |
{:send! send-fn | |
:uids connected-uids | |
:handshake-fn (:ajax-get-or-ws-handshake-fn sente-socket) | |
:ajax-post-fn (:ajax-post-fn sente-socket) | |
:stop! stop-fn})) | |
(defrecord Async [dispatch-method ajax-post-fn handshake-fn send!] | |
component/Lifecycle | |
(start [component] | |
(log/info "Starting async with: " ) | |
(let [dispatch-method (make-dispatch-method) | |
handlers (->> (vals component) | |
(filter #(satisfies? protocols/AsyncHandlerProvider %)) | |
(mapcat protocols/-get-async-handlers)) | |
{:keys [send! handshake-fn ajax-post-fn]} (start-async! dispatch-method)] | |
;(log/debug "HALF KEK: " (vals component)) | |
;(log/debug "KEK: " (vec (filter #(satisfies? logic/AsyncHandlerProvider %) (vals component)))) | |
(doseq [[dispatch-value handler] handlers] | |
(log/debug "Registering event handler for" dispatch-value) | |
(.addMethod ^MultiFn dispatch-method dispatch-value handler)) | |
(.addMethod ^MultiFn dispatch-method :default | |
(fn [event] (log/debug "Unhandled event type: " (dissoc event :ev-msg)))) | |
(assoc component :dispatch-method dispatch-method | |
:handshake-fn handshake-fn | |
:ajax-post-fn ajax-post-fn | |
:send! send!))) | |
(stop [component] | |
(when dispatch-method | |
(log/info "Stopping async.") | |
(dissoc component :dispatch-method :handshake-fn :ajax-post-fn :send!))) | |
protocols/WebServiceProvider | |
(-get-routes [_] | |
[["sente" {:get :async/handshake-fn | |
:post :async/ajax-post-fn}]]) | |
(-get-handlers [this] | |
{:async/handshake-fn handshake-fn | |
:async/ajax-post-fn ajax-post-fn})) | |
(defn make [& [options]] | |
(map->Async {})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment