Skip to content

Instantly share code, notes, and snippets.



Forked from cgrand/foami.clj
Last active Sep 22, 2017
What would you like to do?
(ns foami.core
"FOreign Asynchronous Mechanism Interop"
(:require [clojure.core.async :as async]))
(defn put!
"Takes a `ch`, a `msg`, a single arg function that when passed `true` enables backpressure
and when passed `false` disables it, and a no-arg function which, when invoked, closes the
upstream source."
[ch msg backpressure! close!]
(let [status (atom :sending]
(async/put! ch msg
(fn [result]
(if-not result
(compare-and-set! status :sending :sent)
(compare-and-set! status :paused :sent)
(backpressure! false)))))
(when (compare-and-set! status :sending :paused)
(backpressure! true))

This comment has been minimized.

Copy link

@mpenet mpenet commented Oct 16, 2014


This comment has been minimized.

Copy link

@cgrand cgrand commented Sep 22, 2017

unearthing old stuff but in case someone stumbles on this: in case of fan-in on ch this could throw:

=> (let [c (async/chan)]
     (dotimes [_ 2000] (put! c 42 identity identity)))
AssertionError Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)  clojure.core.async.impl.channels.ManyToManyChannel (channels.clj:152)

My original solution was convoluted (no use of :default for a start), but the core is still true: alt! (and variants) are preferable to put!: when it doesn't succeed the callback is invalidated and doesn't take place in the pending writes queue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment