Skip to content

Instantly share code, notes, and snippets.

@Chouser
Created September 9, 2012 21:52
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save Chouser/3687532 to your computer and use it in GitHub Desktop.
Save Chouser/3687532 to your computer and use it in GitHub Desktop.
Lazy seq as event subscription mechanism
;; Here is a spike of a lightweight in-process pubsub mechanism that allows pure ;; functional consumers, both blocking and asynchronous.
;; This defines the event stream, in this case just a series of numbers,
;; a new one produced each second
(defn timer []
(lazy-seq
(do
(Thread/sleep 1000)
(cons (System/nanoTime) (timer)))))
;; We can see some events (this takes a couple seconds to complete):
(take 3 (timer))
;=> (3383024932037 3384025272769 3385025571742)
;; We can have two consumers run concurrently, but they're actually
;; consuming two different streams, each having started their own,
;; so this isn't quite what we want:
[@(future (take 3 (timer))) @(future (take 3 (timer)))]
;=> [(3445765729693 3446765925828 3447766208228)
; (3448766569987 3449766774030 3450767042063)]
;; So we want to set up a single producer that can be observed in a stable way:
(def most-recent (atom (timer)))
(defn advance-most-recent [] (swap! most-recent rest) (recur))
;; There, now this kicks off a single producer:
(future (advance-most-recent))
;; Now we can have two consumers look at the same stream of events. Note the
;; event numbers are identical for each consumer:
[@(future (take 3 @most-recent)) @(future (take 3 @most-recent))]
;=> [(3735764451405 3736764670106 3737764892260)
(3735764451405 3736764670106 3737764892260)]
;; If one consumer starts late, it may miss some events, but will still see
;; a consistent and sequential view:
[@(future (take 3 @most-recent))
(do (Thread/sleep 1500) @(future (take 3 @most-recent)))]
;=> [(3756768628228 3757768831178 3758769051789)
; (3757768831178 3758769051789 3759769275472)]
;; Consumers could choose to use a watcher instead, for a callback or async-style
;; mechanism. This starts printing a new number every second:
(add-watch most-recent :async
(fn callback [_ _ _ stream]
(prn (first stream)))
;;==========================
;; If the producer wants to asynchronously drop items into the event stream,
;; we have to set it up a bit differently. First, a queue that the producer
;; can put events in:
(def producer-queue (java.util.concurrent.LinkedBlockingQueue.))
;; Then the atom for consumers to follow:
(def most-recent
(atom ((fn more [] (lazy-seq (cons (.take producer-queue) (more)))))))
;; The thread for advancing most-recent is just as above:
(defn advance-most-recent [] (swap! most-recent rest) (recur))
(future (advance-most-recent))
;; Now the producer can be written like this:
(future
(loop [i 0]
(Thread/sleep 1000)
(.put producer-queue i) ;; Publish the next event
(recur (inc i))))
;; The consumers also use exactly the same API as earlier, both async watchers
;; and blocking will work:
[@(future (take 3 @most-recent)) @(future (take 3 @most-recent))]
;=> [(45 46 47) (45 46 47)]
;; thanks to zenoli in #clojure irc for inspiration.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment