Skip to content

Instantly share code, notes, and snippets.

@JacobNinja
Last active April 6, 2022 09:14
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save JacobNinja/5c98496a632e1a466cbf to your computer and use it in GitHub Desktop.
Save JacobNinja/5c98496a632e1a466cbf to your computer and use it in GitHub Desktop.
Clojure core.async pipeline example
(require '[clojure.core.async :as async]
'[clj-http.client :as client]
'[clojure.data.json :as json])
(def concurrency 5)
(let [in (async/chan)
out (async/chan)
request-handler (fn [url out*]
(async/go
(println "Making request:" url)
(let [response (client/get url)
body (json/read-str (:body response))]
(doseq [repo (body "items")]
(async/>! out (repo "clone_url"))))
; Finally close the channel to signal finished processing
(async/close! out*)))]
; Process `in` messages concurrently
(async/pipeline-async concurrency out request-handler in)
; Push URLs to process
(async/go
(doseq [url (for [page (range 10)] (str "https://api.github.com/search/repositories?q=language:clojure&page="
(inc page)))]
(async/>! in url)))
; Print results of processing
(async/go-loop []
(println (async/<! out))
(recur)))
; `in` can be backed by a redis queue
(comment
(async/go-loop []
(if-let [message (pop-redis-queue)]
(async/>! in message)
; Sleep if no messages available
(async/<! (async/timeout 1000)))
(recur)))
@ieugen
Copy link

ieugen commented Apr 6, 2022

I have tried to implement the code using redis, to try it out and it works once.
Trying to understand where I am going wrong:

(ns ro.ieugen.training.async.pipeline
  "https://gist.github.com/JacobNinja/5c98496a632e1a466cbf"
  (:require [clojure.core.async :as async]
            [clj-http.client :as client]
            [clojure.data.json :as json]
            [taoensso.carmine :as car :refer (wcar)]))

(def concurrency 5)
(def server1-conn {:pool {}
                   :spec {:url "redis://localhost:5672/"}})
(defmacro wcar* [& body] `(car/wcar server1-conn ~@body))

(comment
  (wcar*
   (car/ping)
   (car/set "foo" "bar")
   (car/get "foo")
   (car/lpush "queue" "a" "b" "c" "d"))
  (wcar* (car/lpop "queue"))

  ; `in` can be backed by a redis queue
  (let [in (async/chan)
        out (async/chan)
        request-handler (fn [url out*]
                          (async/go
                            (println "Making request:" url)
                            (let [response (client/get url)
                                  body (json/read-str (:body response))]
                              (doseq [repo (get body "items")]
                                (async/>! out (repo "clone_url")))
                              (async/close! out*))
                            (println "Done!" url)))]
    (async/pipeline-async concurrency out request-handler in)
    ; `in` can be backed by a redis queue
    (async/go-loop []
      (if-let [message (wcar* (car/lpop "queue"))]
        (do
          (println "Got message from queue" message)
          (async/>! in message))
      ; Sleep if no messages available
        (do
          (println "sleeping")
          (async/<! (async/timeout 1000))))
      (recur))
    (async/go-loop
     []
      (println (async/<! out))
      (recur)))
  
  ;; send stuff to redis so they get processed.
  (doseq [url (for [page (range 10)]
                (str "https://api.github.com/search/repositories?q=language:clojure&page=" (inc page)))]
    (wcar* (car/lpush "queue" url)))

  0)

After the first processing, the out channel is closed.
That blocks any other future processing.
I can push to redis multiple times and I would like a continuous process.
Any idea what am I missing?

{:paths ["src"]
 :deps {clj-http/clj-http {:mvn/version "3.12.3"}
        org.clojure/data.json {:mvn/version "2.4.0"}
        org.clojure/clojure {:mvn/version "1.11.1"}
        org.clojure/core.async {:mvn/version "1.5.648"}
        com.taoensso/carmine {:mvn/version "3.1.0"}}
 :aliases
 ;; using :deps in aliases will replace deps. Use :extra-deps if you need all
 {:dev {:extra-deps {org.clojure/test.check {:mvn/version "1.1.1"}}}
  :build {:deps {io.github.clojure/tools.build {:git/tag "v0.8.1" :git/sha "7d40500"}}
          :ns-default build}
  :build-params {:deps {io.github.clojure/tools.build {:git/tag "v0.8.1" :git/sha "7d40500"}}
                 :ns-default build-params}}}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment