Skip to content

Instantly share code, notes, and snippets.

@ztellman
Forked from cgrand/foami.clj
Last active September 22, 2017 16:05
Show Gist options
  • Save ztellman/fb64e81d1d7f0b261ccd to your computer and use it in GitHub Desktop.
Save ztellman/fb64e81d1d7f0b261ccd to your computer and use it in GitHub Desktop.
(ns foami.core
"FOreign Asynchronous Mechanism Interop"
(:require [clojure.core.async :as async]))
(defn put!
"Takes a `ch`, a `msg`, a single arg function that when passed `true` enables backpressure
and when passed `false` disables it, and a no-arg function which, when invoked, closes the
upstream source."
[ch msg backpressure! close!]
(let [status (atom :sending]
(async/put! ch msg
(fn [result]
(if-not result
(close!)
(cond
(compare-and-set! status :sending :sent)
nil
(compare-and-set! status :paused :sent)
(backpressure! false)))))
(when (compare-and-set! status :sending :paused)
(backpressure! true))
nil))
@mpenet
Copy link

mpenet commented Oct 16, 2014

@cgrand
Copy link

cgrand commented Sep 22, 2017

unearthing old stuff but in case someone stumbles on this: in case of fan-in on ch this could throw:

=> (let [c (async/chan)]
     (dotimes [_ 2000] (put! c 42 identity identity)))
AssertionError Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)  clojure.core.async.impl.channels.ManyToManyChannel (channels.clj:152)

My original solution was convoluted (no use of :default for a start), but the core is still true: alt! (and variants) are preferable to put!: when it doesn't succeed the callback is invalidated and doesn't take place in the pending writes queue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment