Skip to content

Instantly share code, notes, and snippets.

@ztellman ztellman/foami.clj forked from cgrand/foami.clj
Last active Sep 22, 2017

Embed
What would you like to do?
(ns foami.core
"FOreign Asynchronous Mechanism Interop"
(:require [clojure.core.async :as async]))
(defn put!
"Takes a `ch`, a `msg`, a single arg function that when passed `true` enables backpressure
and when passed `false` disables it, and a no-arg function which, when invoked, closes the
upstream source."
[ch msg backpressure! close!]
(let [status (atom :sending]
(async/put! ch msg
(fn [result]
(if-not result
(close!)
(cond
(compare-and-set! status :sending :sent)
nil
(compare-and-set! status :paused :sent)
(backpressure! false)))))
(when (compare-and-set! status :sending :paused)
(backpressure! true))
nil))
@mpenet

This comment has been minimized.

Copy link

mpenet commented Oct 16, 2014

@cgrand

This comment has been minimized.

Copy link

cgrand commented Sep 22, 2017

unearthing old stuff but in case someone stumbles on this: in case of fan-in on ch this could throw:

=> (let [c (async/chan)]
     (dotimes [_ 2000] (put! c 42 identity identity)))
AssertionError Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)  clojure.core.async.impl.channels.ManyToManyChannel (channels.clj:152)

My original solution was convoluted (no use of :default for a start), but the core is still true: alt! (and variants) are preferable to put!: when it doesn't succeed the callback is invalidated and doesn't take place in the pending writes queue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.