-
-
Save mccraigmccraig/b10156ed0b59de6ccc93dbb1115df7c9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn map-serially* | |
"apply a function f to each of the values in | |
stream s, strictly serially - only one application of f will | |
be in progress at a time | |
can be used to process a set of jobs serially, since using | |
stream buffering to control concurrency is somewhat inexact | |
(minimum of 2 ops in progress concurrently, or if you apply | |
a buffer then n+3 ops in progress) | |
internally makes direct use of manifold's reduce fn, which | |
passes a deferred reduce result straight to the next | |
reduce iteration" | |
[description f s] | |
(let [out (stream)] | |
(d/chain | |
(pr/catch-error-log | |
description | |
(st/reduce (fn [p v] | |
(ddo [_ p ;; extract p | |
r (catch-stream-error | |
(f v)) ;; apply f and extract | |
;; send the response to the output, backpressure | |
_ (put! out r)] | |
(return r))) ;; wrap the response for the next iteration | |
(return deferred-context true) | |
(->source s))) | |
(fn [_] | |
(close! out))) | |
out)) | |
(defmacro map-serially | |
[f s] | |
(let [ns# *ns* | |
{l# :line | |
c# :column} (meta &form) | |
tag# (str ns# ":L" l# ":C" c#)] | |
`(map-serially* ~tag# ~f ~s))) | |
(defn map-concurrency-two | |
[f s] | |
(->> s | |
(map f) | |
(st/realize-each))) | |
(comment | |
;; use the following to convince yourself that the | |
;; minimum concurrency achievable with | |
;; buffer/realize is buffer-size+3 - | |
;; presumably due to each | |
;; stage in a stream chain introducing an implicit | |
;; buffer of 1 | |
(def ops-a (atom #{})) | |
(def ops-history-a (atom [])) | |
(def op (fn [v] | |
(manifold.deferred/future | |
(prn "starting:" v) | |
(swap! ops-a conj v) | |
(swap! ops-history-a conj @ops-a) | |
(Thread/sleep (* 100 v)) | |
(prn "finishing;" v) | |
(swap! ops-a disj v) | |
(swap! ops-history-a conj @ops-a) | |
(inc v)))) | |
(reset! ops-history-a []) | |
@(->> [0 1 2 3 4 5 6 7 8 9] | |
(manifold.stream/->source) | |
(manifold.stream/map op) | |
(manifold.stream/buffer 2) | |
(manifold.stream/realize-each) | |
(manifold.stream/reduce conj [])) | |
@ops-history-a | |
) | |
(defn buffer-concurrency | |
"use realize-each and buffer to control the | |
concurrency of a stream of deferred results of | |
operations (probably produced by mapping a fn | |
over a description of the operation)" | |
[con s] | |
(when (< con 3) | |
(throw (ex-info "minimum concurrency is 3" {:con con}))) | |
(->> s | |
(st/buffer (max 0 (- con 3))) | |
(st/realize-each))) | |
(defn map-concurrently | |
"uniform interface to all the concurrency options - map a fn | |
(which returns a deferred result) over a stream, | |
ensuring that there are a | |
maximum of con unrealized results at any one time" | |
[con f s] | |
(case con | |
1 (map-serially f s) | |
2 (map-concurrency-two f s) | |
(->> s | |
(map f) | |
(buffer-concurrency con)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment