Skip to content

Instantly share code, notes, and snippets.

@viperscape
Last active August 29, 2015 14:01
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save viperscape/f76dc46b27d21cb131cf to your computer and use it in GitHub Desktop.
Save viperscape/f76dc46b27d21cb131cf to your computer and use it in GitHub Desktop.
many-to-many broadcast stream
(defn v-> [s v]
"pushes value onto stack, if possible, returns new head/tail"
(let [p (promise)]
(if (deliver s (cons v (list p)))
p)))
(defn v<- [s]
"returns lazy sequence of actual values, without head/tail"
(when (realized? s)
(cons (first @s)
(lazy-seq (v<- (second @s))))))
(defn v<-* [s]
"returns lazy sequence, including head/tail"
(if (realized? s)
(cons (first @s)
(lazy-seq (v<-* (second @s))))
(lazy-seq (list s))))
(def >*
"somewhat faster than using (last (v<-* s))"
(fn [s]
(if-not (realized? s)
s
(recur (second @s)))))
(defn v->* [s v]
"finds head/tail if needed, loops until this happens, eventually pushes value onto stack"
(loop [_s s]
(if-let [_ (v-> _s v)]
_
(recur (>* _s)))))
;; example usage
(let [s (promise)]
(.start(Thread. #(reduce v->* s (range 6)))) ;; #<core$promise$reify__6363@14a0a721: :pending
(v->* s -1)
(Thread/sleep 10)
(prn (take 2 (v<- s))) ;; (-1 0)
(prn (take 8 (drop 2 (v<-* s)))) ;; (1 2 3 4 5 #<core$promise$reify__6363@14a0a721: :pending>)
(prn (drop-last (v<-* s))) ;; (-1 0 1 2 3 4 5)
(prn (last (v<-* s)))) ;; #<core$promise$reify__6363@14a0a721: :pending
@viperscape
Copy link
Author

It should be noted that it'd be better to only have one producer thread, as this reduces the race to extending the stream.

(drop-last (v<-* s)) is technically slower than (v<- s)

@viperscape
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment