Skip to content

Instantly share code, notes, and snippets.

@favila
Created November 28, 2014 17:06
Show Gist options
  • Save favila/8e7ad6ea5b01bd7466ff to your computer and use it in GitHub Desktop.
Save favila/8e7ad6ea5b01bd7466ff to your computer and use it in GitHub Desktop.
Some missing pieces of core.async. as-transducer: Make a transducer function easily without Clojure 1.7. go-pipe: async/pipe, but returns the go-loop. fast-pipeline-blocking: faster than async/pipeline-blocking, but unordered results. blocking-consumer: consume a channel with multiple worker threads.
(ns favila.async-util
"Some missing pieces of core.async.
as-transducer: Make a transducer function easily without Clojure 1.7.
go-pipe: async/pipe, but returns the go-loop.
fast-pipeline-blocking: faster than async/pipeline-blocking, but unordered results.
blocking-consumer: consume a channel with multiple worker threads."
(:require [clojure.core.async :as async
:refer [go go-loop <! >! <!! >!! close! take! put! chan]]))
(defn as-transducer [f]
"Convert function f into a transducer function.
This is merely a more convenient way to write an efficient reducing-step
function without the transducer signature.
The function should have the signature `(f appender opaque-collection value)`
and apply `appender` to `opaque-collection` to add values to the final
reduced value. It should return the \"modified\" opaque collection like a
normal transducer. It can also box it with `reduced`."
(fn [xf]
(fn
([] (xf))
([r] (xf r))
([r v] (f xf r v)))))
(defn go-pipe
"Like async/pipe, but returns the channel for the pipe's go block instead of
`to`. This is so we can monitor the piping \"process\" directly."
[from to close?]
(go-loop [v (<! from)]
(if (nil? v)
(when close? (close! to))
(when (>! to v)
(recur (<! from))))))
(defn fast-pipeline-blocking
"Like `pipeline-blocking`, except there is no guaranteed relative order
between inputs and outputs, and the return value is a read-only channel which
closes when all workers are finished.
The advantage of this over `pipeline-blocking` is increased thoroughput;
the disadvantage is that results come in no defined order.
Spawns n worker threads, each of which applies transforming function xf to an
item from `from` and puts it on `to`.
If `close?` is true (default true), `to` will be closed when `from` is closed
and *after* flushing all workers (i.e. no items already created by workers
for `to` will be lost)."
([n to xf from] (fast-pipeline-blocking n to xf from true))
([n to xf from close?]
(fast-pipeline-blocking n to xf from close?
(fn [ex]
(-> (Thread/currentThread)
.getUncaughtExceptionHandler
(.uncaughtException (Thread/currentThread) ex))
nil)))
([n to xf from close? ex-handler]
(assert (pos? n))
(let [make-worker (fn []
(async/thread
(let [xfc (chan 1 xf ex-handler)
pipes (async/merge [(go-pipe from xfc true)
(go-pipe xfc to false)])]
;; This will block until from and xfc are closed.
(<!! pipes))))
workers (async/merge (repeatedly n make-worker))]
(if close?
(go (<! workers) (close! to) nil)
workers))))
(defn blocking-consumer
"Spawn `n` threads, each of which will read from channel `ch` and call
`f` with the read value as its only argument. Presumably, `f` will case a
side-effect with the value.
If `f` does something long-running, it should block to exert back-pressure
on ch.
Returns a channel which closes when all workers close. All workers will close
when `ch` closes."
[n ch f]
(letfn [(make-worker []
(async/thread
(loop [v (<!! ch)]
(when-not (nil? v)
(f v)
(recur (<!! ch))))))]
(async/merge (repeatedly n make-worker))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment