Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
example of blocking core.async pipeline
;add (:require [clojure.core.async :refer [chan close! go >! <! <!! >!! go-loop put! thread alts! alts!! timeout pipeline pipeline-blocking pipeline-async]])
;chans
(def to-ch (chan))
(def from-ch (chan))
;data looks like {url some-data, etc}
(def my-map (atom {}))
(defn fetch-from-url [{:keys [url]}]
(println "going to fetch from " url)
(let [data (slurp url)]
;return
{:url url :data data}))
;save to atom one at a time
(go (loop []
(let [{:keys [url data]} (<! to-ch)]
(swap! my-map assoc url data)
(recur))))
;pipeline with 10 parallelism
(def pipeline-1 (pipeline-blocking 10 to-ch (map fetch-from-url) from-ch))
(def urls ["http://google.com"
"http://yahoo.com"
"http://facebook.com"
"http://amazon.com"])
;start the processing
(doseq [url urls]
(>!! from-ch {:url url}))
@escherize
Copy link

Great stuff!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment