Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
an example of a core.async process manager thing
(ns system-example
(:require [spork.async.system :as sys]
[clojure.core.async :as async :refer [chan]]))
(def ins (chan (async/dropping-buffer 1)))
(def outs (chan (async/dropping-buffer 1)))
(sys/go-push :clock ins (let [res (async/<! (async/timeout 1000))]
(System/currentTimeMillis)))
(sys/go-pull :reader ins n (do (println n) (async/put! outs n)))
;;this gets us some values printed to the repl..
;;1553716940150
;;1553716941152
;;1553716942155
;;1553716943156
;;1553716944158
;;1553716945159
(sys/stop! :clock)
;;#system-example/eval20514/reify--20580[{:status :ready, :val :stopped}
;;0x33ceb77d]
;;[:clock "stopping"]
(sys/start! :clock)
;;#system-example/eval20514/reify--20580[{:status :ready, :val :started}
;;0x33ceb77d]
;;1553717077725
;;1553717078725
;;1553717079725
;;1553717080725
(sys/stop! :clock)
;;#system-example/eval20514/reify--20580[{:status :ready, :val :stopped}
;;0x33ceb77d]
;;[:clock "stopping"]
;;Even though :clock is stopped, :reader is still
;;active. We can see this via sys/active...
(sys/active)
;;([:reader
;;#system-example/eval20593/reify--20660[{:status :ready, :val :started}
;;0x498a6f5c]])
(sys/inactive)
;;([:clock
;;#system-example/eval20514/reify--20580[{:status :ready, :val :stopped}
;;0x33ceb77d]])
;;So we have named processes that we can stop, start, etc.
;;we can use kill-all! to eliminate any running or stopped processes,
;;which will clean up for us, or use stop! and prune! to selectively
;;clear only stopped processes...
(sys/kill-all!)
;;[:reader "stopping"]
;;We can replace existing processes by defining same-named processes,
;;the legacies should be stopped, then started with the new behavior:
;;This time, we'll have the reader turn off the clock after
;;3 ticks. We'll use some process-local state (in this case
;;an atom) to keep track of ticks.
(defn pusher [delay]
(sys/go-push :clock ins
(let [res (async/<! (async/timeout delay))]
(System/currentTimeMillis))))
(defn puller [name limit]
(let [remaining (atom limit)]
(sys/go-pull name ins n
(if (> @remaining 0)
(do (println [name n :remaining (swap! remaining dec)])
(async/put! outs n))
;;reduced items stop the go-loop
(reduced n)))))
(pusher 1000)
(puller :reader 3)
(puller :other-reader 3)
;; [:reader 1553721780297 :remaining 2]
;; [:other-reader 1553721780609 :remaining 2]
;; [:reader 1553721781297 :remaining 1]
;; [:other-reader 1553721781610 :remaining 1]
;; [:reader 1553721782298 :remaining 0]
;; [:other-reader 1553721782610 :remaining 0]
(sys/inactive)
;; ([:reader #system-example/puller/reify--24441[{:status :ready, :val :stopped}
;; 0x528b8088]]
;; [:other-reader
;; #system-example/puller/reify--24441[{:status :ready, :val :stopped}
;; 0x6031e8b7]])
(sys/kill-all!)
;;You can wire up communications with watchers on atoms
;;for tighter, even-driven control, where the watchers
;;push values to channels. There all kinds of control
;;constructs to mix and match.
;;The original use case was a java2d animation system,
;;communicating via channels and core.async processes
;;for rendering and controlling widgets.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.