Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Utility functions for core.async 0.1.346.0-17112a-alpha to create a linear pipeline of go-routines in a Storm-esque way.
(ns your.namespace.async-util
(:require
[clojure.core.async :as a :refer [>! <! >!! <!! go chan buffer
close! thread alts! alts!! timeout]]
[clojure.core.match :as match :refer [match]]))
;;
;; core.async util fns
;;
(defn- linear-pipeline-spout-go-routine
"This is a private fn that, for a linear pipeline of go routines, will create
a go routine that is like a Storm spout. msg-seq is a seq of msgs to emit on the
output channel.
Also supports :take-num in config-map to limit the number of tuples emitted to the out-chan."
[config-map]
(let [{:keys [out-chan in-seq take-num]} config-map]
;; using map instead of pmap b/c any short latency (that is, I/O) should
;; be handled by downstream async routines. any really long latency (blocking)
;; should be done via thread!. if reading the seq is the bottlenck, then
;; we're doing something wrong -- we should try to offload that work in the async
;; pipeline.
(if take-num
(doall (map (fn [msg] (>!! out-chan msg)) (take take-num in-seq)))
(doall (map (fn [msg] (>!! out-chan msg)) in-seq)))
(close! out-chan)))
(defn- linear-pipeline-bolt-go-routine
"This is a private fn that, for a linear pipeline of go routines, will create
a go routine that is like a Storm bolt. The in and out channels are
in-chan, out-chan. The transformation fn is f, a 1-arg fn operating on values
from in-chan.
Also supports :take-num in config-map to limit the number of tuples emitted to the out-chan,
and :map-cat to indicate whether the fn in :f returns seqs that should be broken
into its constituent elements."
[config-map]
(let [{:keys [in-chan out-chan f parallelism map-cat take-num blocking-put]} config-map
instance-count (atom parallelism)]
(dotimes [i parallelism]
(go (loop [num-emit 0]
(let [v (<! in-chan)]
;; check if msg is valid (channel not closed) and we haven't
;; exceeded the msg limit (take-num)
(if (and v
(not (and take-num
(>= num-emit take-num))))
(do
;; check if we need to break out a seq into individual
;; msgs (map-cat) or not
(if map-cat
;; break out seq into individual msgs, handle emitted msg
;; counting and limits accordingly
(do (let [vals-to-emit (if take-num
(take (min (count v) (- take-num num-emit)) v)
v)]
(doseq [elem vals-to-emit]
(let [out-val (f elem)]
(if blocking-put
(>!! out-chan out-val)
(>! out-chan out-val))))
(recur (+ (count vals-to-emit)
num-emit))))
;; deal with input without any special breaking out / flattening
(do
(let [out-val (f v)]
(if blocking-put
(>!! out-chan out-val)
(>! out-chan out-val)))
(recur (inc num-emit)))))
;; close out process
(do
(swap! instance-count dec)
(when (zero? @instance-count)
(close! out-chan))))))))))
(defn- linear-pipeline-final-bolt-go-routine
"This is a private fn that, for a linear pipeline of go routines, will create
a go routine that is like a Storm bolt, but for the final bolt. The only channel
is the input in-chan. The fn f is provided to make the output values available for
final processing, so f is a 1-arg fn operating on values from in-chan. The fn stop-fn
is a final cleanup/teardown fn of 0-args.
Also supports :take-num in config-map to limit the number of tuples emitted to the out-chan,
and :map-cat to indicate whether the inputshould be broken into its
constituent elements before the fn :f operates on it/them."
[config-map]
(let [{:keys [in-chan f stop-fn parallelism map-cat take-num]} config-map
instance-count (atom parallelism)]
(dotimes [i parallelism]
(go (loop [num-emit 0]
(let [v (<! in-chan)]
;; check if msg is valid (channel not closed) and we haven't
;; exceeded the msg limit (take-num)
(if (and v
(not (and take-num
(>= num-emit take-num))))
(do
;; processing final output is optional, so check if fn provided
(when f
;; check if we need to break out a seq into individual
;; msgs (map-cat) or not
(if map-cat
;; break out seq into individual msgs, handle emitted msg
;; counting and limits accordingly
(do
(let [vals-to-emit (if take-num
(take (min (count v) (- take-num num-emit)) v)
v)]
(doseq [elem vals-to-emit]
(f elem))
(recur (+ (count vals-to-emit)
num-emit))))
;; deal with input without any special breaking out / flattening
(do
(f v)
(recur (inc num-emit))))))
;; close out process
(do
(swap! instance-count dec)
(when (zero? @instance-count)
(stop-fn))))))))))
(defn- create-go-routine-dispatch-function
"Using multimethods allows you to choose which fn to use (dispatch) based on
values at runtime. In OOP, dispatch is done exclusively on one paramter - the
type of the object (or: the output value you get when running .getClass on
the object in question). Here, we create our own fn to create our own output
value so that we can do dispatch with more flexibly and without needing objects.
This fn is to be used in the defmulti portion of the definition of the multi-method
create-go-routine."
[config-map]
(letfn [(is-present
[map-key]
(not (nil? (get config-map map-key))))]
(let [map-keys [:in-chan :in-seq :out-chan :f :stop-fn :parallelism]
all-is-present (mapv is-present map-keys)]
(match all-is-present
[false true true false _ false] :spout
[true false true true _ true] :bolt
[true _ false true true true] :final-bolt
:else (throw (Exception. (str "Cannot parse config-map passed to create-go-routine"
" where presence of " map-keys " is " (pr-str all-is-present))))))))
;; definition of the interface fn create-go-routine and how we will dispatch.
;; we will dispatch on which elements in the config map are present
(defmulti create-go-routine
"This fn creates a go routine for a linear async pipeline."
(fn [config-map] (create-go-routine-dispatch-function config-map)))
;; impl fns for multimethod create-go-routine
(defmethod create-go-routine :spout
[config-map]
(linear-pipeline-spout-go-routine config-map))
(defmethod create-go-routine :bolt
[config-map]
(linear-pipeline-bolt-go-routine config-map))
(defmethod create-go-routine :final-bolt
[config-map]
(linear-pipeline-final-bolt-go-routine config-map))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment