Skip to content

Instantly share code, notes, and snippets.

@niwinz
Last active November 22, 2017 11:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niwinz/083f5eb6ee52001ec5d30a94e2e3dc41 to your computer and use it in GitHub Desktop.
Save niwinz/083f5eb6ee52001ec5d30a94e2e3dc41 to your computer and use it in GitHub Desktop.
Recycle Iteration
;; BOOT_CLOJURE_NAME=org.clojure/clojure
;; BOOT_CLOJURE_VERSION=1.9.0-RC1
;; BOOT_VERSION=2.7.2
;; BOOT_JVM_OPTIONS="-Xms4g -Xmx4g -XX:+UseG1GC -XX:+AggressiveOpts -server"
(set-env! :dependencies '[[org.clojure/core.async "0.3.443"]
[org.clojure/clojure "1.9.0-RC1"]])
(require '[clojure.core.async :as a]
'[clojure.core.async.impl.protocols :as ap])
;; --- Helpers
(def ^:private noop (constantly nil))
(defn- service?
[v]
(::service v))
(defn- chan?
[v]
(satisfies? ap/ReadPort v))
(defmacro try-on
[& body]
`(try ~@body (catch Throwable e# e#)))
;; --- Main API
(def ^:dynamic *timeout*
"Maximum number of milliseconds to wait for a response from a
service. This variable by default is nil, meaning the value used
when creating the service is used."
nil)
(defn- default-receive
"A default implementation for `:receive`."
[& _]
(ex-info "service do not implement :receive" {}))
(defn service
"Create a service using the spec map as a blueprint.
The spec map may contain any of the following keys (all of the are
optional):
- `:init`: initialization hook, receives the service spec and optionally some
config parameter and it should return the internal state instance;
this parameter is optional.
- `:stop`: termination hook, receives the service spec, internal state and
allows proper resource clearing; this parameter is optional.
- `:error`: error hook, receives the service spec, internal state and the
exception instance; it should handle it or just return the exception
instance as is; this parameter is optional.
- `:receive`: function that receives the internal state and variable number of args
identifying the message sent to the service.
- `:timeout`: default maximum number of milliseconds to wait for a response
from the service (default 1min).
- `:buf-or-n`: core.async buffer or size of buffer to use for the
service communication channel (default 1024).
"
[& {:keys [init stop error receive buf-or-n timeout]
:or {init noop
stop noop
error identity
buf-or-n 1024
receive default-receive}
:as spec}]
(let [inbox-ch (a/chan (if (integer? buf-or-n) buf-or-n (buf-or-n)))
stop-ch (a/chan 1)
timeout (or timeout *timeout* 60000)
status (atom ::stopped)
local (volatile! nil)]
{::service true
::buf-or-n buf-or-n
::init init
::stop stop
::receive receive
::inbox-ch inbox-ch
::stop-ch stop-ch
::timeout timeout
::status status
::local local}))
(declare initialize-loop)
(defn started?
"Check if service is started. Returns true if it is, false if it's not."
[{:keys [::status] :as service}]
{:pre [(service? service)]}
(= @status ::started))
(defn stopped?
"Check if service is stopped. Returns true if it is, false if it's not."
[{:keys [::status] :as service}]
{:pre [(service? service)]}
(= @status ::stopped))
(defn start!
([service]
(start! service nil))
([{:keys [::status ::local ::init] :as service} config]
{:pre [(service? service)]}
(when (compare-and-set! status ::stopped ::started)
(vreset! local (init config))
(assoc service ::loop-ch (initialize-loop service)))))
(defn stop!
[{:keys [::status ::local ::stop-ch ::stop] :as service}]
{:pre [(service? service)]}
(when (compare-and-set! status ::started ::stopped)
(a/put! stop-ch true) ;; Notify the internal loop that the service is stoped
(stop @local)
(vreset! local nil)))
(defn with-state
"A function that allows return a value attached to new local state."
[value state]
{::with-state true
::local state
::value value})
(defn ask!
[{:keys [::inbox-ch ::timeout] :as service} & args]
(let [output (a/chan 1)
timeout (a/timeout timeout)
message [output args]]
(a/go
(let [[val port] (a/alts! [[inbox-ch message] timeout])]
(if (identical? port timeout)
(ex-info "put message to service timed out" {})
(let [[val port] (a/alts! [output timeout])]
(if (identical? port timeout)
(ex-info "take message result from service timed out" {})
val)))))))
(defn ask!!
[service & args]
(let [result (a/<!! (apply ask! service args))]
(if (instance? Throwable result)
(throw result)
result)))
;; --- Implementation
(defn- handle-message
[{:keys [::receive ::local]} [out args]]
(a/go-loop [result (try-on (apply receive @local args))]
(cond
(chan? result)
(recur (a/<! result))
(and (map? result)
(::with-state result))
(do
(vreset! local (::local result))
(a/>! out (::value result))
(a/close! out))
:else
(do
(a/>! out result)
(a/close! out)))))
(defn- initialize-loop
[{:keys [::inbox-ch ::stop-ch] :as service}]
(a/go-loop []
(let [[msg port] (a/alts! [stop-ch inbox-ch] :priority true)]
(when (identical? port inbox-ch)
(a/<! (handle-message service msg))
(recur)))))
;; --- Example Code
;; An adder service
(def service-a
(service :init (constantly +)
:receive (fn [local & args]
;; `local` is `+` function
(apply local args))))
;; Asychronous counter service
(def service-b
(service :init (constantly 0)
:receive (fn [counter & args]
(a/go
(a/<! (a/timeout 100)) ;; simulate some async work
(with-state counter (inc counter))))))
;; Hierarchical service
(def service-c
(service
;; Service initialization
:init (fn [config]
(let [adder (service :receive (fn [_ & args] (apply + args)))
hello (service :receive (fn [_ & [name]] (str "Hello " name)))]
{:adder (start! adder)
:hello (start! hello)}))
;; Service resource cleaining
:stop (fn [{:keys [adder hello] :as local}]
(stop! adder)
(stop! hello))
;; Service on message hook
:receive (fn [{:keys [adder hello] :as local} & [name & rest]]
(a/go
(case name
:add (a/<! (apply ask! adder rest))
:greets (a/<! (apply ask! hello rest)))))))
(start! service-a)
(start! service-b)
(start! service-c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment