-
-
Save mtnygard/3c05c26f4cfad420eee6ce32ac03819f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Copyright (c) 2017 Cognitect, Inc. All rights reserved. | |
(ns control-server.broadcaster | |
(:require [clojure.core.async :as a] | |
[com.stuartsierra.component :as component] | |
[control-server.controls :as controls] | |
[datomic.api :as d] | |
[io.pedestal.http.sse :as http.sse] | |
[io.pedestal.log :as log] | |
[simulator.clock :as clock] | |
[control-server.database :as database])) | |
;; Network event streams | |
;; ---------------------- | |
(def clock-event-type "clock") | |
(def control-state-event-type "control_state") | |
(def network-event-clients (atom {})) | |
(def client-key :response-channel) | |
(defn- update-client! | |
[ctx data] | |
(swap! network-event-clients update (get ctx client-key) merge data)) | |
(defn network-events-ready! | |
[event-ch ctx] | |
(update-client! ctx {:event-channel event-ch}) | |
(a/put! event-ch {:name clock-event-type :data clock/zero}) | |
event-ch) | |
(defn send-network-event! | |
[event-data] | |
(let [sse-event event-data] | |
(doseq [{:keys [event-channel]} (vals @network-event-clients)] | |
(a/put! event-channel sse-event))) | |
event-data) | |
(defn network-events-disconnect! | |
[ctx] | |
;; Pedestal already closes the response and event channel, so we just need to clean up | |
(swap! network-event-clients dissoc (get ctx client-key)) | |
ctx) | |
(defn killall-clients! [] | |
(doseq [[r-ch {e-ch :event-channel}] @network-event-clients] | |
(swap! network-event-clients dissoc r-ch) | |
(a/close! e-ch) | |
(a/close! r-ch))) | |
;; Transaction queue listener | |
;; On transaction, gets state of controls, sends to SSE stream | |
;; | |
;; TODO: filter to just txes involving controls. | |
(defn tx-report->channel | |
[conn ch] | |
(let [q (d/tx-report-queue conn)] | |
(a/thread | |
(try | |
(loop [] | |
(let [tx-report (.take q)] | |
(when (and tx-report (a/put! ch tx-report)) | |
(recur)))) | |
(catch Throwable t | |
(log/error :exception t :message "Aborting tx-report->channel loop") | |
(a/close! ch)) | |
(finally | |
(d/remove-tx-report-queue conn)))))) | |
(defn channel->network | |
[ch] | |
(a/go-loop [v (a/<! ch)] | |
(when v | |
(log/trace :message "Sending control update to clients" :data v) | |
(send-network-event! v) | |
(recur (a/<! ch))))) | |
(defn log-error | |
[from] | |
(fn [ex] | |
(log/error :exception ex :from from) | |
nil)) | |
(def control-events (http.sse/start-event-stream network-events-ready! 10 10 {:on-client-disconnect network-events-disconnect!})) | |
(defn all-controls | |
[db] | |
(d/q '[:find [(pull ?c [* | |
{:control/actual-state [:db/ident]} | |
{:control/desired-state [:db/ident]}]) ...] | |
:where [?c :control/name]] | |
db)) | |
(defn- control-state-event | |
[db] | |
{:name control-state-event-type | |
:data (all-controls db)}) | |
(defn- clock-event | |
[db] | |
{:name clock-event-type | |
:data (clock/query-clock db)}) | |
(defn- classify-tx-report | |
[control-idents actual-state clock-ident] | |
(fn [rf] | |
(fn | |
([] (rf)) | |
([result] (rf result)) | |
([result tx-report] | |
(let [involved-entities (distinct (map #(.e %) (:tx-data tx-report))) | |
attributes (distinct (map #(.a %) (:tx-data tx-report))) | |
control? (and (some control-idents involved-entities) | |
(some #{actual-state} attributes)) | |
clock? (some #{clock-ident} involved-entities)] | |
(cond-> result | |
control? | |
(rf (control-state-event (:db-after tx-report))) | |
clock? | |
(rf (clock-event (:db-after tx-report))))))))) | |
(defn- control-eids | |
[db] | |
(into #{} (mapcat identity (d/q '[:find ?e :where [?e :control/name]] db)))) | |
;; Lifecycle | |
;; --------- | |
(defrecord Reactor [database datomic-uri controls conn routes tx-ch] | |
component/Lifecycle | |
(start [this] | |
(log/info :message "Starting event broadcaster") | |
(let [conn (database/connection database) | |
db (d/db conn) | |
control-eids (control-eids db) | |
clock-eid (d/entid db :world-clock) | |
actual-state (d/entid db :control/actual-state) | |
xf (classify-tx-report control-eids actual-state clock-eid) | |
tx-ch (a/chan (a/dropping-buffer 100) xf (log-error :tx-report-channel))] | |
(tx-report->channel conn tx-ch) | |
(channel->network tx-ch) | |
(assoc this | |
:conn conn | |
:routes #{["/api/control-server/v1/control-events" :get control-events]} | |
:tx-ch tx-ch))) | |
(stop [this] | |
(log/info :message "Stopping event broadcaster") | |
(when conn | |
(d/remove-tx-report-queue conn)) | |
(when tx-ch | |
(a/close! tx-ch)) | |
(killall-clients!) | |
(assoc this | |
:conn nil | |
:tx-ch nil | |
:routes nil))) | |
(defn reactor [] | |
(map->Reactor {})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment