Created
March 27, 2019 21:41
-
-
Save joinr/b6144113321c9582120cc3a1d9293f3c to your computer and use it in GitHub Desktop.
an example of a core.async process manager thing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns system-example | |
(:require [spork.async.system :as sys] | |
[clojure.core.async :as async :refer [chan]])) | |
(def ins (chan (async/dropping-buffer 1))) | |
(def outs (chan (async/dropping-buffer 1))) | |
(sys/go-push :clock ins (let [res (async/<! (async/timeout 1000))] | |
(System/currentTimeMillis))) | |
(sys/go-pull :reader ins n (do (println n) (async/put! outs n))) | |
;;this gets us some values printed to the repl.. | |
;;1553716940150 | |
;;1553716941152 | |
;;1553716942155 | |
;;1553716943156 | |
;;1553716944158 | |
;;1553716945159 | |
(sys/stop! :clock) | |
;;#system-example/eval20514/reify--20580[{:status :ready, :val :stopped} | |
;;0x33ceb77d] | |
;;[:clock "stopping"] | |
(sys/start! :clock) | |
;;#system-example/eval20514/reify--20580[{:status :ready, :val :started} | |
;;0x33ceb77d] | |
;;1553717077725 | |
;;1553717078725 | |
;;1553717079725 | |
;;1553717080725 | |
(sys/stop! :clock) | |
;;#system-example/eval20514/reify--20580[{:status :ready, :val :stopped} | |
;;0x33ceb77d] | |
;;[:clock "stopping"] | |
;;Even though :clock is stopped, :reader is still | |
;;active. We can see this via sys/active... | |
(sys/active) | |
;;([:reader | |
;;#system-example/eval20593/reify--20660[{:status :ready, :val :started} | |
;;0x498a6f5c]]) | |
(sys/inactive) | |
;;([:clock | |
;;#system-example/eval20514/reify--20580[{:status :ready, :val :stopped} | |
;;0x33ceb77d]]) | |
;;So we have named processes that we can stop, start, etc. | |
;;we can use kill-all! to eliminate any running or stopped processes, | |
;;which will clean up for us, or use stop! and prune! to selectively | |
;;clear only stopped processes... | |
(sys/kill-all!) | |
;;[:reader "stopping"] | |
;;We can replace existing processes by defining same-named processes, | |
;;the legacies should be stopped, then started with the new behavior: | |
;;This time, we'll have the reader turn off the clock after | |
;;3 ticks. We'll use some process-local state (in this case | |
;;an atom) to keep track of ticks. | |
(defn pusher [delay] | |
(sys/go-push :clock ins | |
(let [res (async/<! (async/timeout delay))] | |
(System/currentTimeMillis)))) | |
(defn puller [name limit] | |
(let [remaining (atom limit)] | |
(sys/go-pull name ins n | |
(if (> @remaining 0) | |
(do (println [name n :remaining (swap! remaining dec)]) | |
(async/put! outs n)) | |
;;reduced items stop the go-loop | |
(reduced n))))) | |
(pusher 1000) | |
(puller :reader 3) | |
(puller :other-reader 3) | |
;; [:reader 1553721780297 :remaining 2] | |
;; [:other-reader 1553721780609 :remaining 2] | |
;; [:reader 1553721781297 :remaining 1] | |
;; [:other-reader 1553721781610 :remaining 1] | |
;; [:reader 1553721782298 :remaining 0] | |
;; [:other-reader 1553721782610 :remaining 0] | |
(sys/inactive) | |
;; ([:reader #system-example/puller/reify--24441[{:status :ready, :val :stopped} | |
;; 0x528b8088]] | |
;; [:other-reader | |
;; #system-example/puller/reify--24441[{:status :ready, :val :stopped} | |
;; 0x6031e8b7]]) | |
(sys/kill-all!) | |
;;You can wire up communications with watchers on atoms | |
;;for tighter, even-driven control, where the watchers | |
;;push values to channels. There all kinds of control | |
;;constructs to mix and match. | |
;;The original use case was a java2d animation system, | |
;;communicating via channels and core.async processes | |
;;for rendering and controlling widgets. | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment