Skip to content

Instantly share code, notes, and snippets.

@jaen

jaen/async.clj Secret

Created February 9, 2016 20:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jaen/cc8600e682eb65ec4a55 to your computer and use it in GitHub Desktop.
Save jaen/cc8600e682eb65ec4a55 to your computer and use it in GitHub Desktop.
(ns thesis.backend.components.async
(:require [com.stuartsierra.component :as component]
[taoensso.timbre :as log]
[taoensso.sente :as sente]
[taoensso.sente.server-adapters.immutant :as sente-immutant]
[taoensso.sente.packers.transit :as sente-transit]
[clojure.core.match :refer [match]]
[cuerdas.core :as str]
[clojure.core.async :as async]
[thesis.common.utils.transit :as transit-utils]
[thesis.backend.components.protocols :as protocols])
(:import [clojure.lang MultiFn]))
(defn make-dispatch-method []
(new MultiFn "scrabble.backend.components.async/async-dispatch-method"
:event
:default
#'clojure.core/global-hierarchy))
(defn- make-request-map [{:as ev-msg :keys [uid client-id id ?data ?reply-fn]}]
(log/debug "Event:" uid ?data)
(let [{:keys [payload message-id]} ?data
;[valid? {{user-id :user} :iss}] (authentication/verify-token uid)
user-id (when-not (str/starts-with? (str uid) "tempid-") uid)
_ (log/debug "client id" user-id)
current-user nil #_(when user-id (future (users/find-by {:id user-id})))
request (merge {:ev-msg ev-msg
:event id
:client-id client-id
:user-id uid
:message-id message-id
:payload payload}
(when user-id {:current-user current-user}))]
request))
(defn make-event-handler [dispatch-fn]
(fn [{:keys [event send-fn] :as ev-msg}]
(let [[id data :as ev] event]
;(log/debug "Ev-msg:" ev-msg)
(log/debug "Event:" event)
(log/debug "Event id:" id )
(match event ; [id data]
[:chsk/handshake _] (log/debug "Got handshake!")
[:chsk/state {:first-open? true}] (log/debug "Channel socket successfully established!")
[:chsk/state new-state] (log/debug "Chsk state change:" new-state)
[:chsk/recv payload] (dispatch-fn (make-request-map ev-msg))
;[:chsk/uidport-open]
;[:chsk/uidport-close]
#_[:chsk/recv payload] #_(do
(log/debug "Push event from server")
(base-handler ev-msg))
:else (dispatch-fn (make-request-map ev-msg))))))
(defn- start-async! [dispatch]
(let [user-id-fn (fn [request]
(:client-id request)
#_(let [client-id (:client-id request)
[valid? token] (authentication/verify-token client-id)
user-id (get-in token [:iss :user :id])]
(log/info "Async user id is:" user-id)
(if valid?
user-id
client-id)))
sente-packer (sente-transit/->TransitPacker :json {:handlers transit-utils/transit-write-handlers}
{:handlers transit-utils/transit-read-handlers})
sente-socket (sente/make-channel-socket! sente-immutant/immutant-adapter {:user-id-fn user-id-fn
:packer sente-packer})
receive-chan (:ch-recv sente-socket)
sente-send! (:send-fn sente-socket)
connected-uids (:connected-uids sente-socket)
handle-receive (make-event-handler dispatch)
sente-router-stop! (sente/start-chsk-router! receive-chan handle-receive)
send-fn (fn send!
([uuid event payload]
(send! uuid event payload nil))
([uuid event payload metadata]
(let [payload (merge {:payload payload}
(when metadata
metadata))]
(log/debug "sending payload:" payload)
(sente-send! uuid [event payload]))))
stop-fn (fn []
(sente-router-stop!)
(async/close! receive-chan))]
{:send! send-fn
:uids connected-uids
:handshake-fn (:ajax-get-or-ws-handshake-fn sente-socket)
:ajax-post-fn (:ajax-post-fn sente-socket)
:stop! stop-fn}))
(defrecord Async [dispatch-method ajax-post-fn handshake-fn send!]
component/Lifecycle
(start [component]
(log/info "Starting async with: " )
(let [dispatch-method (make-dispatch-method)
handlers (->> (vals component)
(filter #(satisfies? protocols/AsyncHandlerProvider %))
(mapcat protocols/-get-async-handlers))
{:keys [send! handshake-fn ajax-post-fn]} (start-async! dispatch-method)]
;(log/debug "HALF KEK: " (vals component))
;(log/debug "KEK: " (vec (filter #(satisfies? logic/AsyncHandlerProvider %) (vals component))))
(doseq [[dispatch-value handler] handlers]
(log/debug "Registering event handler for" dispatch-value)
(.addMethod ^MultiFn dispatch-method dispatch-value handler))
(.addMethod ^MultiFn dispatch-method :default
(fn [event] (log/debug "Unhandled event type: " (dissoc event :ev-msg))))
(assoc component :dispatch-method dispatch-method
:handshake-fn handshake-fn
:ajax-post-fn ajax-post-fn
:send! send!)))
(stop [component]
(when dispatch-method
(log/info "Stopping async.")
(dissoc component :dispatch-method :handshake-fn :ajax-post-fn :send!)))
protocols/WebServiceProvider
(-get-routes [_]
[["sente" {:get :async/handshake-fn
:post :async/ajax-post-fn}]])
(-get-handlers [this]
{:async/handshake-fn handshake-fn
:async/ajax-post-fn ajax-post-fn}))
(defn make [& [options]]
(map->Async {}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment