Skip to content

Instantly share code, notes, and snippets.

@gre
Last active August 29, 2015 14:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gre/f85113cada9d73ae8ad5 to your computer and use it in GitHub Desktop.
Save gre/f85113cada9d73ae8ad5 to your computer and use it in GitHub Desktop.
(ns channels.core
(:require
[clojure.core.async :as async :refer [>! <! filter< alts! sliding-buffer tap mult chan thread timeout go go-loop alts!!]]
)
(:gen-class))
(defn filtered-channel [chan-input predicate & [buf-or-n]]
(let [c (if buf-or-n (chan buf-or-n) (chan))]
(tap chan-input c)
(filter< predicate c)))
(defn time-bufferize [in t]
(let [c (chan)]
(go
(loop
[buf []
timer (timeout t)]
(let [[value port] (alts! [in timer])
timed-out (= port timer) ]
(if (= port timer) ; timed out
(do
(>! c buf)
(recur [] (timeout t)))
(recur (conj buf value) timer)))))
c))
(defn twitter-stream-channel [keywords]
(let [c (chan)]
(go-loop
[i 1]
(let [tweet {:text (str i ": Dude I love " (rand-nth keywords))}]
(println (str "receive #" i))
(>! c tweet)
(<! (timeout (rand 1000))))
(recur (inc i)))
c))
(defn tweet-matches-topic [tweet topic]
(if-let [ text (.toLowerCase (:text tweet)) ]
(some #(.contains text %) (:keywords topic))))
(defn topic-channel [topic tweets-mult]
(filtered-channel tweets-mult #(tweet-matches-topic % topic) (sliding-buffer 100)))
(defn -main [& args]
(let
[
topics [{:id "functional"
:keywords ["ocaml" "clojure" "scala"]
:rate 2000}
{:id "oo"
:keywords ["java" "scala"]
:rate 5000
}]
keywords (flatten (map :keywords topics))
tweets-mult (mult (twitter-stream-channel keywords))
]
(doseq [topic topics]
(let [stream (topic-channel topic tweets-mult)
rate (:rate topic)
buffered (time-bufferize stream rate) ]
(go-loop
[]
(let [tweets (<! buffered)]
(println (str (:id topic) ": " tweets)))
; (<! (timeout rate))
(recur))))
(Thread/sleep 20000)
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment