Skip to content

Instantly share code, notes, and snippets.

@dpsutton
Last active May 17, 2020 16:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dpsutton/0f851684a0913be048ab43a6d49ca046 to your computer and use it in GitHub Desktop.
Save dpsutton/0f851684a0913be048ab43a6d49ca046 to your computer and use it in GitHub Desktop.
ghadi's completeable future helpers from slack
(defn ->Function
[f]
(reify java.util.function.Function
(apply [_ obj] (f obj))))
(defn future-map
[cf f]
(.thenApply cf (->Function f)))
(defn future->ch
([cf]
(future->ch cf (async/chan 1) true))
([cf ch]
(future->ch cf ch false))
([^CompletableFuture cf ch close?]
(.whenComplete cf (reify java.util.function.BiConsumer
(accept [_ obj t]
(when-some [ret (if t
{:cognitect.anomalies/category
:cognitect.anomalies/fault
:throwable t}
obj)]
(async/put! ch ret))
(when close? (async/close! ch)))))
ch))
;; and consumer:
(defn ffprobe
"Returns a channel with results from ffprobe"
[^Path p]
(let [args ["ffprobe"
"-loglevel" "warning"
"-print_format" "json"
"-show_streams"
"-show_format"
(.. p toAbsolutePath toString)]
handle (fn [^Process proc]
(let [err (.getErrorStream proc)
stdout (.getInputStream proc)
exit (.exitValue proc)
ret {:path p
:ffmpeg/error (slurp err)}]
(if (zero? exit)
(merge ret (-> stdout io/reader json/read))
(assoc ret :exit exit :stdout (slurp stdout)))))]
(-> (.. (ProcessBuilder. ^java.util.List args) start onExit)
(util/future-map handle)
(util/future->ch))))
(defn pipeline
"Runs asynchronous function 'af' on each input from channel 'in',
producing results to channel 'out'. af is presumed to return a channel
Input order is *not* preserved.
Runs af with maximum 'max' concurrency. max can be an integer
or a function returning integer (allowing dynamic concurrency
control)
close?, default true, controls whether the output channel is closed
upon completion"
([max af in out]
(pmax max af in out true))
([max af in out close?]
(let [max (if (int? max) (constantly max) max)
reads #(cond-> % (< (count %) (max)) (conj in))
drain (fn [tasks]
(async/go
(loop [tasks tasks]
(when (seq tasks)
(let [[v sc] (async/alts! (vec tasks))]
(when (some? v) (async/>! out v))
(recur (disj tasks sc)))))
(when close? (async/close! out))))]
(async/go
(loop [tasks #{}]
(let [chs (vec (reads tasks))]
(when (pos? (count chs))
(let [[v ch] (async/alts! chs)]
(if (= ch in)
(if v
(recur (conj tasks (af v)))
(async/<! (drain (disj tasks in))))
(do
(when (some? v) (async/>! out v))
(recur (disj tasks ch))))))))))))
@dpsutton
Copy link
Author

that is not what I'm saying
4:06
use whatever thread pools you want, but I coordinate processes using channels
4:06
I wrote a process that crawls a filesystem, looking for all video files, and launches ffmpeg using the above code to extract metadata from all the videos.
the process runs max 20 ffmpeg calls in parallel
4:07
it lights up the cores:
4:07
Screen Shot 2020-05-10 at 5.08.02 PM.png
Screen Shot 2020-05-10 at 5.08.02 PM.png
(opens in new tab)
4:08
the clojure process doesn't even register on the map
4:08
one thread walks the filesystem and pumps all videos encountered onto a channel
4:08
another process in the middle takes video files and shells out to ffmpeg, subject to the concurrency limit (edited)
4:09
and it places its result maps on a third channel
4:09
that channel contains datomic transaction data
4:09
so I can answer questions like "What files have h.264 streams that are more than 720p?", etc.
4:10
wherever I said "ffmpeg" I could replace that with "http request"

@dpsutton
Copy link
Author

In the system I mentioned above,
the filesystem pump uses async/thread
the ffmpeg stuff is a CompletableFuture that dumps onto a channel
the concurrency limiter in the middle is a single go block
the main thread reads the results of video metadata extraction

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment