Skip to content

Instantly share code, notes, and snippets.

@alanmarazzi
Forked from JacobNinja/pipeline.clj
Created October 26, 2020 11:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alanmarazzi/bc0b9291611c864c1679a28e68c65db9 to your computer and use it in GitHub Desktop.
Save alanmarazzi/bc0b9291611c864c1679a28e68c65db9 to your computer and use it in GitHub Desktop.
Clojure core.async pipeline example
(require '[clojure.core.async :as async]
'[clj-http.client :as client]
'[clojure.data.json :as json])
(def concurrency 5)
(let [in (async/chan)
out (async/chan)
request-handler (fn [url out*]
(async/go
(println "Making request:" url)
(let [response (client/get url)
body (json/read-str (:body response))]
(doseq [repo (body "items")]
(async/>! out (repo "clone_url"))))
; Finally close the channel to signal finished processing
(async/close! out*)))]
; Process `in` messages concurrently
(async/pipeline-async concurrency out request-handler in)
; Push URLs to process
(async/go
(doseq [url (for [page (range 10)] (str "https://api.github.com/search/repositories?q=language:clojure&page="
(inc page)))]
(async/>! in url)))
; Print results of processing
(async/go-loop []
(println (async/<! out))
(recur)))
; `in` can be backed by a redis queue
(comment
(async/go-loop []
(if-let [message (pop-redis-queue)]
(async/>! in message)
; Sleep if no messages available
(async/<! (async/timeout 1000)))
(recur)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment