Skip to content

Instantly share code, notes, and snippets.

@janherich
Last active August 29, 2015 13:55
Show Gist options
  • Save janherich/8776271 to your computer and use it in GitHub Desktop.
Save janherich/8776271 to your computer and use it in GitHub Desktop.
Input pipe channel
(ns async-util.input-pipe
(:require [clojure.core.async.impl.protocols :as impl]
[clojure.core.async :refer [<! >! close! go-loop]]))
(defprotocol InputPipe
(detach* [p]))
(defn input-pipe
"Creates and return input pipe of supplied channel with attached input channel."
[ch ch-i]
(let [i (atom ch-i)
p (reify
impl/Channel
(close! [_] (impl/close! ch))
(closed? [_] (impl/closed? ch))
impl/ReadPort
(take! [_ fn1] (impl/take! ch fn1))
impl/WritePort
(put! [_ val fn1] (impl/put! ch val fn1))
InputPipe
(detach* [_] (if (impl/closed? ch)
false
(do (reset! i nil) true))))]
(go-loop [val (<! @i)]
(if-let [i-c @i]
(if (nil? val)
(close! p)
(when (>! p val)
(recur (<! i-c))))
(reset! i nil)))
p))
(defn detach-input
"Detaches input from input-pipe channel, returns false
if input-pipe channel is closed, true otherwise"
[p]
(detach* p))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment