Skip to content

Instantly share code, notes, and snippets.



Forked from ztellman/foami.clj
Created Sep 22, 2017
What would you like to do?
(ns foami.core
"FOreign Asynchronous Mechanism Interop"
(:require [clojure.core.async :as async]))
(defn put!
"Takes a `ch`, a `msg`, a single arg function that when passed `true` enables backpressure
and when passed `false` disables it, and a no-arg function which, when invoked, closes the
upstream source."
[ch msg backpressure! close!]
(let [status (atom :sending]
(async/put! ch msg
(fn [result]
(if-not result
(compare-and-set! status :sending :sent)
(compare-and-set! status :paused :sent)
(backpressure! false)))))
(when (compare-and-set! status :sending :paused)
(backpressure! true))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment