Skip to content

Instantly share code, notes, and snippets.

Created March 15, 2017 23:11
Show Gist options
  • Save mtnygard/3c05c26f4cfad420eee6ce32ac03819f to your computer and use it in GitHub Desktop.
Save mtnygard/3c05c26f4cfad420eee6ce32ac03819f to your computer and use it in GitHub Desktop.
;; Copyright (c) 2017 Cognitect, Inc. All rights reserved.
(ns control-server.broadcaster
(:require [clojure.core.async :as a]
[com.stuartsierra.component :as component]
[control-server.controls :as controls]
[datomic.api :as d]
[io.pedestal.http.sse :as http.sse]
[io.pedestal.log :as log]
[simulator.clock :as clock]
[control-server.database :as database]))
;; Network event streams
;; ----------------------
(def clock-event-type "clock")
(def control-state-event-type "control_state")
(def network-event-clients (atom {}))
(def client-key :response-channel)
(defn- update-client!
[ctx data]
(swap! network-event-clients update (get ctx client-key) merge data))
(defn network-events-ready!
[event-ch ctx]
(update-client! ctx {:event-channel event-ch})
(a/put! event-ch {:name clock-event-type :data clock/zero})
(defn send-network-event!
(let [sse-event event-data]
(doseq [{:keys [event-channel]} (vals @network-event-clients)]
(a/put! event-channel sse-event)))
(defn network-events-disconnect!
;; Pedestal already closes the response and event channel, so we just need to clean up
(swap! network-event-clients dissoc (get ctx client-key))
(defn killall-clients! []
(doseq [[r-ch {e-ch :event-channel}] @network-event-clients]
(swap! network-event-clients dissoc r-ch)
(a/close! e-ch)
(a/close! r-ch)))
;; Transaction queue listener
;; On transaction, gets state of controls, sends to SSE stream
;; TODO: filter to just txes involving controls.
(defn tx-report->channel
[conn ch]
(let [q (d/tx-report-queue conn)]
(loop []
(let [tx-report (.take q)]
(when (and tx-report (a/put! ch tx-report))
(catch Throwable t
(log/error :exception t :message "Aborting tx-report->channel loop")
(a/close! ch))
(d/remove-tx-report-queue conn))))))
(defn channel->network
(a/go-loop [v (a/<! ch)]
(when v
(log/trace :message "Sending control update to clients" :data v)
(send-network-event! v)
(recur (a/<! ch)))))
(defn log-error
(fn [ex]
(log/error :exception ex :from from)
(def control-events (http.sse/start-event-stream network-events-ready! 10 10 {:on-client-disconnect network-events-disconnect!}))
(defn all-controls
(d/q '[:find [(pull ?c [*
{:control/actual-state [:db/ident]}
{:control/desired-state [:db/ident]}]) ...]
:where [?c :control/name]]
(defn- control-state-event
{:name control-state-event-type
:data (all-controls db)})
(defn- clock-event
{:name clock-event-type
:data (clock/query-clock db)})
(defn- classify-tx-report
[control-idents actual-state clock-ident]
(fn [rf]
([] (rf))
([result] (rf result))
([result tx-report]
(let [involved-entities (distinct (map #(.e %) (:tx-data tx-report)))
attributes (distinct (map #(.a %) (:tx-data tx-report)))
control? (and (some control-idents involved-entities)
(some #{actual-state} attributes))
clock? (some #{clock-ident} involved-entities)]
(cond-> result
(rf (control-state-event (:db-after tx-report)))
(rf (clock-event (:db-after tx-report)))))))))
(defn- control-eids
(into #{} (mapcat identity (d/q '[:find ?e :where [?e :control/name]] db))))
;; Lifecycle
;; ---------
(defrecord Reactor [database datomic-uri controls conn routes tx-ch]
(start [this]
(log/info :message "Starting event broadcaster")
(let [conn (database/connection database)
db (d/db conn)
control-eids (control-eids db)
clock-eid (d/entid db :world-clock)
actual-state (d/entid db :control/actual-state)
xf (classify-tx-report control-eids actual-state clock-eid)
tx-ch (a/chan (a/dropping-buffer 100) xf (log-error :tx-report-channel))]
(tx-report->channel conn tx-ch)
(channel->network tx-ch)
(assoc this
:conn conn
:routes #{["/api/control-server/v1/control-events" :get control-events]}
:tx-ch tx-ch)))
(stop [this]
(log/info :message "Stopping event broadcaster")
(when conn
(d/remove-tx-report-queue conn))
(when tx-ch
(a/close! tx-ch))
(assoc this
:conn nil
:tx-ch nil
:routes nil)))
(defn reactor []
(map->Reactor {}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment