Skip to content

Instantly share code, notes, and snippets.

@mccraigmccraig
Created January 23, 2020 12:36
Show Gist options
  • Save mccraigmccraig/b10156ed0b59de6ccc93dbb1115df7c9 to your computer and use it in GitHub Desktop.
Save mccraigmccraig/b10156ed0b59de6ccc93dbb1115df7c9 to your computer and use it in GitHub Desktop.
(defn map-serially*
"apply a function f to each of the values in
stream s, strictly serially - only one application of f will
be in progress at a time
can be used to process a set of jobs serially, since using
stream buffering to control concurrency is somewhat inexact
(minimum of 2 ops in progress concurrently, or if you apply
a buffer then n+3 ops in progress)
internally makes direct use of manifold's reduce fn, which
passes a deferred reduce result straight to the next
reduce iteration"
[description f s]
(let [out (stream)]
(d/chain
(pr/catch-error-log
description
(st/reduce (fn [p v]
(ddo [_ p ;; extract p
r (catch-stream-error
(f v)) ;; apply f and extract
;; send the response to the output, backpressure
_ (put! out r)]
(return r))) ;; wrap the response for the next iteration
(return deferred-context true)
(->source s)))
(fn [_]
(close! out)))
out))
(defmacro map-serially
[f s]
(let [ns# *ns*
{l# :line
c# :column} (meta &form)
tag# (str ns# ":L" l# ":C" c#)]
`(map-serially* ~tag# ~f ~s)))
(defn map-concurrency-two
[f s]
(->> s
(map f)
(st/realize-each)))
(comment
;; use the following to convince yourself that the
;; minimum concurrency achievable with
;; buffer/realize is buffer-size+3 -
;; presumably due to each
;; stage in a stream chain introducing an implicit
;; buffer of 1
(def ops-a (atom #{}))
(def ops-history-a (atom []))
(def op (fn [v]
(manifold.deferred/future
(prn "starting:" v)
(swap! ops-a conj v)
(swap! ops-history-a conj @ops-a)
(Thread/sleep (* 100 v))
(prn "finishing;" v)
(swap! ops-a disj v)
(swap! ops-history-a conj @ops-a)
(inc v))))
(reset! ops-history-a [])
@(->> [0 1 2 3 4 5 6 7 8 9]
(manifold.stream/->source)
(manifold.stream/map op)
(manifold.stream/buffer 2)
(manifold.stream/realize-each)
(manifold.stream/reduce conj []))
@ops-history-a
)
(defn buffer-concurrency
"use realize-each and buffer to control the
concurrency of a stream of deferred results of
operations (probably produced by mapping a fn
over a description of the operation)"
[con s]
(when (< con 3)
(throw (ex-info "minimum concurrency is 3" {:con con})))
(->> s
(st/buffer (max 0 (- con 3)))
(st/realize-each)))
(defn map-concurrently
"uniform interface to all the concurrency options - map a fn
(which returns a deferred result) over a stream,
ensuring that there are a
maximum of con unrealized results at any one time"
[con f s]
(case con
1 (map-serially f s)
2 (map-concurrency-two f s)
(->> s
(map f)
(buffer-concurrency con))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment