Created January 23, 2020 12:36
(defn map-serially*
"apply a function f to each of the values in
stream s, strictly serially - only one application of f will
be in progress at a time
can be used to process a set of jobs serially, since using
stream buffering to control concurrency is somewhat inexact
(minimum of 2 ops in progress concurrently, or if you apply
a buffer then n+3 ops in progress)
internally makes direct use of manifold's reduce fn, which
passes a deferred reduce result straight to the next
reduce iteration"
[description f s]
(let [out (stream)]
(st/reduce (fn [p v]
(ddo [_ p ;; extract p
r (catch-stream-error
(f v)) ;; apply f and extract
;; send the response to the output, backpressure
_ (put! out r)]
(return r))) ;; wrap the response for the next iteration
(return deferred-context true)
(->source s)))
(fn [_]
(close! out)))
(defmacro map-serially
[f s]
(let [ns# *ns*
{l# :line
c# :column} (meta &form)
tag# (str ns# ":L" l# ":C" c#)]
`(map-serially* ~tag# ~f ~s)))
(defn map-concurrency-two
[f s]
(->> s
(map f)
;; use the following to convince yourself that the
;; minimum concurrency achievable with
;; buffer/realize is buffer-size+3 -
;; presumably due to each
;; stage in a stream chain introducing an implicit
;; buffer of 1
(def ops-a (atom #{}))
(def ops-history-a (atom []))
(def op (fn [v]
(prn "starting:" v)
(swap! ops-a conj v)
(swap! ops-history-a conj @ops-a)
(Thread/sleep (* 100 v))
(prn "finishing;" v)
(swap! ops-a disj v)
(swap! ops-history-a conj @ops-a)
(inc v))))
(reset! ops-history-a [])
@(->> [0 1 2 3 4 5 6 7 8 9]
( op)
( 2)
( conj []))
(defn buffer-concurrency
"use realize-each and buffer to control the
concurrency of a stream of deferred results of
operations (probably produced by mapping a fn
over a description of the operation)"
[con s]
(when (< con 3)
(throw (ex-info "minimum concurrency is 3" {:con con})))
(->> s
(st/buffer (max 0 (- con 3)))
(defn map-concurrently
"uniform interface to all the concurrency options - map a fn
(which returns a deferred result) over a stream,
ensuring that there are a
maximum of con unrealized results at any one time"
[con f s]
(case con
1 (map-serially f s)
2 (map-concurrency-two f s)
(->> s
(map f)
(buffer-concurrency con))))
