Skip to content

Instantly share code, notes, and snippets.

@bkirkbri
Last active December 21, 2015 12:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bkirkbri/6309498 to your computer and use it in GitHub Desktop.
Save bkirkbri/6309498 to your computer and use it in GitHub Desktop.
Interfacing synchronous systems with core.async
(require [clojure.core.async
:refer [chan dropping-buffer thread <!! >!! >!]])
(defmacro try! [ch & body]
`(let [ch# ~ch]
(try ~@body (catch Throwable t#
(when ch# (>! ch# t#))))))
(defmacro try!! [ch & body]
`(let [ch# ~ch]
(try ~@body (catch Throwable t#
(when ch# (>!! ch# t#))))))
(defn pool
"Creates a pool of threads that will listen for messages of the
form `[ch f]` by running `(f)` and passing the result into `ch`.
Returns a channel that will listen for requests.
Takes optional args:
* `in`, the channel on which to listen for messages
* `err`, a channel to send exceptions that are encountered
* `size`, the number of threads in the pool
Closing `in` will end processing and shutdown the pool."
[& opts]
(let [{:keys [in err size]
:or {in (chan) size 1}} opts]
(dotimes [_ size]
(thread
(while (try
(when-let [[out thunk] (<!! in)]
(try!! err (>!! out (thunk)))
true)
(catch Throwable _ true)))))
in))
(defn async*
[pool out err thunk]
(let [out (or out (chan 1))
thunk (if err #(try!! err (thunk)) thunk)]
(>!! pool [out thunk])
out))
(defmacro async
"Performs `body` asynchronously using `pool`, returning the result
via a channel. The return channel may be specified with `:out ch`
and an error channel can be provided using `:err ch`.
Returns the return channel (whether provided or not).
For example: `(<!! (async pool :err err-ch (Thread/sleep 100) 42))`"
[pool & body]
(loop [opts {}
[k v & more :as body] body]
(if (#{:out :err} k)
(recur (assoc opts k v) more)
(let [{:keys [out err]} opts]
`(async* ~pool ~out ~err (fn [] ~@body))))))
(defn single-error-channel
"Returns a channel which will accept a single error and drop
subsequent puts."
[]
(chan (dropping-buffer 1)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment