(ns foami.core
"FOreign Asynchronous Mechanism Interop"
(:require [clojure.core.async :as async]))
(defn put!
"Takes a `ch`, a `msg`, a single arg function that when passed `true` enables backpressure
and when passed `false` disables it, and a no-arg function which, when invoked, closes the
upstream source."
[ch msg backpressure! close!]
(let [status (atom :sending]
(async/put! ch msg
(fn [result]
(if-not result
(compare-and-set! status :sending :sent)
(compare-and-set! status :paused :sent)
(backpressure! false)))))
(when (compare-and-set! status :sending :paused)
(backpressure! true))

@mpenet mpenet commented Oct 16, 2014


@cgrand cgrand commented Sep 22, 2017

unearthing old stuff but in case someone stumbles on this: in case of fan-in on ch this could throw:

=> (let [c (async/chan)]
     (dotimes [_ 2000] (put! c 42 identity identity)))
AssertionError Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)  clojure.core.async.impl.channels.ManyToManyChannel (channels.clj:152)

My original solution was convoluted (no use of :default for a start), but the core is still true: alt! (and variants) are preferable to put!: when it doesn't succeed the callback is invalidated and doesn't take place in the pending writes queue.

