Skip to content

Instantly share code, notes, and snippets.

@joinr
Created March 27, 2019 21:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joinr/b6144113321c9582120cc3a1d9293f3c to your computer and use it in GitHub Desktop.
Save joinr/b6144113321c9582120cc3a1d9293f3c to your computer and use it in GitHub Desktop.
an example of a core.async process manager thing
(ns system-example
(:require [spork.async.system :as sys]
[clojure.core.async :as async :refer [chan]]))
(def ins (chan (async/dropping-buffer 1)))
(def outs (chan (async/dropping-buffer 1)))
(sys/go-push :clock ins (let [res (async/<! (async/timeout 1000))]
(System/currentTimeMillis)))
(sys/go-pull :reader ins n (do (println n) (async/put! outs n)))
;;this gets us some values printed to the repl..
;;1553716940150
;;1553716941152
;;1553716942155
;;1553716943156
;;1553716944158
;;1553716945159
(sys/stop! :clock)
;;#system-example/eval20514/reify--20580[{:status :ready, :val :stopped}
;;0x33ceb77d]
;;[:clock "stopping"]
(sys/start! :clock)
;;#system-example/eval20514/reify--20580[{:status :ready, :val :started}
;;0x33ceb77d]
;;1553717077725
;;1553717078725
;;1553717079725
;;1553717080725
(sys/stop! :clock)
;;#system-example/eval20514/reify--20580[{:status :ready, :val :stopped}
;;0x33ceb77d]
;;[:clock "stopping"]
;;Even though :clock is stopped, :reader is still
;;active. We can see this via sys/active...
(sys/active)
;;([:reader
;;#system-example/eval20593/reify--20660[{:status :ready, :val :started}
;;0x498a6f5c]])
(sys/inactive)
;;([:clock
;;#system-example/eval20514/reify--20580[{:status :ready, :val :stopped}
;;0x33ceb77d]])
;;So we have named processes that we can stop, start, etc.
;;we can use kill-all! to eliminate any running or stopped processes,
;;which will clean up for us, or use stop! and prune! to selectively
;;clear only stopped processes...
(sys/kill-all!)
;;[:reader "stopping"]
;;We can replace existing processes by defining same-named processes,
;;the legacies should be stopped, then started with the new behavior:
;;This time, we'll have the reader turn off the clock after
;;3 ticks. We'll use some process-local state (in this case
;;an atom) to keep track of ticks.
(defn pusher [delay]
(sys/go-push :clock ins
(let [res (async/<! (async/timeout delay))]
(System/currentTimeMillis))))
(defn puller [name limit]
(let [remaining (atom limit)]
(sys/go-pull name ins n
(if (> @remaining 0)
(do (println [name n :remaining (swap! remaining dec)])
(async/put! outs n))
;;reduced items stop the go-loop
(reduced n)))))
(pusher 1000)
(puller :reader 3)
(puller :other-reader 3)
;; [:reader 1553721780297 :remaining 2]
;; [:other-reader 1553721780609 :remaining 2]
;; [:reader 1553721781297 :remaining 1]
;; [:other-reader 1553721781610 :remaining 1]
;; [:reader 1553721782298 :remaining 0]
;; [:other-reader 1553721782610 :remaining 0]
(sys/inactive)
;; ([:reader #system-example/puller/reify--24441[{:status :ready, :val :stopped}
;; 0x528b8088]]
;; [:other-reader
;; #system-example/puller/reify--24441[{:status :ready, :val :stopped}
;; 0x6031e8b7]])
(sys/kill-all!)
;;You can wire up communications with watchers on atoms
;;for tighter, even-driven control, where the watchers
;;push values to channels. There all kinds of control
;;constructs to mix and match.
;;The original use case was a java2d animation system,
;;communicating via channels and core.async processes
;;for rendering and controlling widgets.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment