Skip to content

Instantly share code, notes, and snippets.

@JacobNinja
Last active April 6, 2022 09:14
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save JacobNinja/5c98496a632e1a466cbf to your computer and use it in GitHub Desktop.
Save JacobNinja/5c98496a632e1a466cbf to your computer and use it in GitHub Desktop.
Clojure core.async pipeline example
(require '[clojure.core.async :as async]
'[clj-http.client :as client]
'[clojure.data.json :as json])
(def concurrency 5)
(let [in (async/chan)
out (async/chan)
request-handler (fn [url out*]
(async/go
(println "Making request:" url)
(let [response (client/get url)
body (json/read-str (:body response))]
(doseq [repo (body "items")]
(async/>! out (repo "clone_url"))))
; Finally close the channel to signal finished processing
(async/close! out*)))]
; Process `in` messages concurrently
(async/pipeline-async concurrency out request-handler in)
; Push URLs to process
(async/go
(doseq [url (for [page (range 10)] (str "https://api.github.com/search/repositories?q=language:clojure&page="
(inc page)))]
(async/>! in url)))
; Print results of processing
(async/go-loop []
(println (async/<! out))
(recur)))
; `in` can be backed by a redis queue
(comment
(async/go-loop []
(if-let [message (pop-redis-queue)]
(async/>! in message)
; Sleep if no messages available
(async/<! (async/timeout 1000)))
(recur)))
@JacobNinja
Copy link
Author

Result:

Making request: https://api.github.com/search/repositories?q=language:clojure&page=1
Making request: https://api.github.com/search/repositories?q=language:clojure&page=3
Making request: https://api.github.com/search/repositories?q=language:clojure&page=4
Making request: https://api.github.com/search/repositories?q=language:clojure&page=2
Making request: https://api.github.com/search/repositories?q=language:clojure&page=5
Making request: https://api.github.com/search/repositories?q=language:clojure&page=6
Making request: https://api.github.com/search/repositories?q=language:clojure&page=7
https://github.com/ckirkendall/enfocus.git
https://github.com/tonsky/rum.git
https://github.com/jkk/honeysql.git
https://github.com/clojure/java.jdbc.git
https://github.com/weavejester/lein-ring.git
https://github.com/ckirkendall/kioo.git
https://github.com/technomancy/slamhound.git
https://github.com/yieldbot/flambo.git
https://github.com/someteam/acha.git
https://github.com/xsc/lein-ancient.git
https://github.com/joshaber/clojurem.git
https://github.com/jamesmacaulay/zelkova.git
https://github.com/ztellman/gloss.git
https://github.com/rplevy/swiss-arrows.git
https://github.com/omcljs/om-cookbook.git
https://github.com/joyofclojure/book-source.git
https://github.com/amalloy/useful.git
https://github.com/liquidz/misaki.git
https://github.com/gigasquid/wonderland-clojure-katas.git
https://github.com/reiddraper/simple-check.git
https://github.com/racehub/om-bootstrap.git
https://github.com/ptaoussanis/clojure-web-server-benchmarks.git
https://github.com/metosin/compojure-api.git
https://github.com/dgrnbrg/spyscope.git
https://github.com/james-henderson/chord.git
https://github.com/clojure/tools.namespace.git
https://github.com/fhd/clostache.git
https://github.com/Factual/skuld.git
https://github.com/damballa/parkour.git
https://github.com/jaycfields/expectations.git
https://github.com/cemerick/piggieback.git
https://github.com/slagyr/speclj.git
https://github.com/benzap/flyer.js.git
https://github.com/Datomic/day-of-datomic.git
https://github.com/jalehman/react-tutorial-om.git
https://github.com/ztellman/rhizome.git
https://github.com/liebke/cljr.git
https://github.com/Raynes/conch.git
https://github.com/riemann/riemann-jvm-profiler.git
https://github.com/weavejester/codox.git
https://github.com/drcode/webfui.git
https://github.com/immutant/immutant.git
https://github.com/magnars/optimus.git
https://github.com/bodil/BODOL.git
https://github.com/arthuredelstein/clooj.git
https://github.com/clojurewerkz/elastisch.git
https://github.com/clojure/test.check.git
https://github.com/frenchy64/typed-clojure.git
https://github.com/lynaghk/cljx.git
https://github.com/ztellman/manifold.git
https://github.com/aphyr/tesser.git
https://github.com/juxt/jig.git
https://github.com/tailrecursion/javelin.git
https://github.com/nuroko/nurokit.git
https://github.com/weavejester/environ.git
https://github.com/mmcgrana/clj-stacktrace.git
https://github.com/JulianBirch/cljs-ajax.git
https://github.com/r0man/sablono.git
https://github.com/weavejester/ragtime.git
https://github.com/ibdknox/jayq.git
https://github.com/fogus/trammel.git
https://github.com/ptaoussanis/nippy.git
https://github.com/cgrand/enliven.git
https://github.com/Datomic/codeq.git
https://github.com/hraberg/deuce.git
https://github.com/Raynes/tentacles.git
https://github.com/mcohen01/amazonica.git
https://github.com/k2nr/ViChrome.git
https://github.com/cemerick/pomegranate.git
https://github.com/gcv/appengine-magic.git
https://github.com/rkneufeld/lein-try.git
https://github.com/yogthos/markdown-clj.git
https://github.com/scgilardi/slingshot.git
https://github.com/brandonbloom/fipp.git
https://github.com/levand/domina.git
https://github.com/rbrush/clara-rules.git
https://github.com/semperos/clj-webdriver.git
https://github.com/ztellman/penumbra.git
https://github.com/magnars/prone.git
https://github.com/Netflix/PigPen.git
https://github.com/liebke/avout.git
https://github.com/MichaelDrogalis/dire.git
https://github.com/aaronc/freactive.git
https://github.com/juxt/bidi.git
https://github.com/aboekhoff/congomongo.git
https://github.com/yogthos/Selmer.git
https://github.com/pyr/cyanite.git
https://github.com/jonromero/music-as-data.git
https://github.com/maryrosecook/islaclj.git
https://github.com/mikera/core.matrix.git
https://github.com/4clojure/4clojure.git
https://github.com/swannodette/logic-tutorial.git
https://github.com/andrewvc/engulf.git
https://github.com/pkamenarsky/atea.git
https://github.com/michaelklishin/monger.git
https://github.com/clojure/core.match.git
https://github.com/funcool/buddy.git
https://github.com/gdeer81/marginalia.git
https://github.com/whamtet/Excel-REPL.git
https://github.com/puniverse/pulsar.git
https://github.com/macourtney/Conjure.git
https://github.com/richhickey/clojure-contrib.git
https://github.com/drewr/postal.git
https://github.com/wit-ai/duckling.git
https://github.com/LightTable/LightTable.git
https://github.com/Prismatic/hiphip.git
https://github.com/clojure-android/lein-droid.git
https://github.com/clojure/clojurescript.git
https://github.com/ztellman/automat.git
https://github.com/ptaoussanis/carmine.git
https://github.com/technomancy/leiningen.git
https://github.com/adamwynne/twitter-api.git
https://github.com/relevance/labrepl.git
https://github.com/jonase/kibit.git
https://github.com/omcljs/om.git
https://github.com/oakes/lein-fruit.git
https://github.com/cemerick/austin.git
https://github.com/mmcgrana/ring.git
https://github.com/overtone/overtone.git
https://github.com/purnam/purnam.git
https://github.com/arcadia-unity/Arcadia.git
https://github.com/cognitect/transit-format.git
https://github.com/weavejester/compojure.git
https://github.com/LauJensen/clojureql.git
https://github.com/tailrecursion/hoplon.git
https://github.com/bhauman/lein-figwheel.git
https://github.com/aphyr/riemann.git
https://github.com/clojure/tools.cli.git
https://github.com/takeoutweight/clojure-scheme.git
https://github.com/noprompt/frak.git
https://github.com/functional-koans/clojure-koans.git
https://github.com/weavejester/cljfmt.git
https://github.com/google/hesokuri.git
https://github.com/Factual/drake.git
https://github.com/swannodette/mori.git
https://github.com/technomancy/robert-hooke.git
https://github.com/puppetlabs/trapperkeeper.git
https://github.com/clojure-liberator/liberator.git
https://github.com/incanter/incanter.git
https://github.com/clojurebook/ClojureProgramming.git
https://github.com/dakrone/clojure-opennlp.git
https://github.com/ztellman/aleph.git
https://github.com/clojure/core.logic.git
https://github.com/nathanmarz/storm-deploy.git
https://github.com/pedestal/app-tutorial.git
https://github.com/quil/quil.git
https://github.com/plexus/chestnut.git
https://github.com/swannodette/enlive-tutorial.git
https://github.com/yogthos/clj-pdf.git
https://github.com/pedestal/pedestal.git
https://github.com/onyx-platform/onyx.git
https://github.com/noir-clojure/lib-noir.git
https://github.com/ztellman/potemkin.git
https://github.com/reagent-project/reagent.git
https://github.com/stuartsierra/component.git
https://github.com/stuarthalloway/programming-clojure.git
https://github.com/Raynes/fs.git
https://github.com/magomimmo/modern-cljs.git
https://github.com/noir-clojure/noir.git
https://github.com/oakes/Nightweb.git
https://github.com/elastic/es2unix.git
https://github.com/clojure-cookbook/clojure-cookbook.git
https://github.com/ptaoussanis/sente.git
https://github.com/nathanmarz/specter.git
https://github.com/cgrand/moustache.git
https://github.com/tonsky/datascript.git
https://github.com/ztellman/lamina.git
https://github.com/circleci/frontend.git
https://github.com/clojure/algo.monads.git
https://github.com/Engelberg/instaparse.git
https://github.com/schani/clojurec.git
https://github.com/jonase/eastwood.git
https://github.com/budu/lobos.git
https://github.com/nathanmarz/cascalog.git
https://github.com/emezeske/lein-cljsbuild.git
https://github.com/gf3/secretary.git
https://github.com/tomjakubowski/weasel.git
https://github.com/frankiesardo/icepick.git
https://github.com/krisajenkins/yesql.git
https://github.com/clojure/tools.nrepl.git
https://github.com/Datomic/simulant.git
https://github.com/weavejester/hiccup.git
https://github.com/dakrone/clj-http.git
https://github.com/Prismatic/om-tools.git
https://github.com/AvisoNovate/pretty.git
https://github.com/Prismatic/plumbing.git
https://github.com/swannodette/lt-cljs-tutorial.git
https://github.com/levand/quiescent.git
https://github.com/kawasima/jagrid.git
https://github.com/cgrand/enlive.git
https://github.com/pallet/pallet.git
https://github.com/LuxLang/lux.git
https://github.com/marick/Midje.git
https://github.com/clojure/core.typed.git
https://github.com/imalooney/t3tr0s.git
https://github.com/Prismatic/schema.git
https://github.com/Prismatic/dommy.git
https://github.com/aphyr/jepsen.git
https://github.com/Day8/re-frame.git
https://github.com/korma/Korma.git
https://github.com/dakrone/cheshire.git
https://github.com/clojure/core.async.git
https://github.com/oakes/play-clj.git
https://github.com/daveray/seesaw.git
https://github.com/venantius/ultra.git
https://github.com/cemerick/friend.git
https://github.com/ptaoussanis/timbre.git
https://github.com/boot-clj/boot.git
https://github.com/oakes/Nightcode.git
Making request: https://github.com/hugoduncan/criterium.githttps://api.github.com/search/repositories?q=language:clojure&page=8

Making request:Making request:  https://api.github.com/search/repositories?q=language:clojure&page=10https://api.github.com/search/repositories?q=language:clojure&page=9

https://github.com/michaelklishin/langohr.git
https://github.com/franks42/clj-ns-browser.git
https://github.com/jochu/swank-clojure.git
https://github.com/hraberg/mimir.git
https://github.com/mtgred/netrunner.git
https://github.com/ninjudd/clojure-protobuf.git
https://github.com/weavejester/clj-aws-s3.git
https://github.com/mjul/docjure.git
https://github.com/rosado/clj-processing.git
https://github.com/brunoV/throttler.git
https://github.com/weavejester/reagi.git
https://github.com/aphyr/timelike.git
https://github.com/stuartsierra/clojure-hadoop.git
https://github.com/sbtourist/nimrod.git
https://github.com/youngnh/parsatron.git
https://github.com/mikera/clisk.git
https://github.com/overtone/at-at.git
https://github.com/PrecursorApp/om-i.git
https://github.com/amitrathore/swarmiji.git
https://github.com/ctford/leipzig.git
https://github.com/clojure/math.combinatorics.git
https://github.com/weavejester/clout.git
https://github.com/richhickey/harmonikit.git
https://github.com/yieldbot/marceline.git
https://github.com/magnars/stasis.git
https://github.com/davidsantiago/hickory.git
https://github.com/weavejester/medley.git
https://github.com/runexec/chp.git
https://github.com/LonoCloud/synthread.git
https://github.com/razum2um/clj-debugger.git
https://github.com/frenchy64/Logic-Starter.git
https://github.com/sunng87/slacker.git
https://github.com/Prismatic/fnhouse.git
https://github.com/clojure/tools.trace.git
https://github.com/stuartsierra/lazytest.git
https://github.com/clojure-emacs/cider-nrepl.git
https://github.com/r0man/cljs-http.git
https://github.com/cemerick/clojurescript.test.git
https://github.com/reagent-project/reagent-cookbook.git
https://github.com/cgrand/parsley.git
https://github.com/swannodette/om-sync.git
https://github.com/owainlewis/falkor.git
https://github.com/clojure/core.cache.git
https://github.com/LightTable/fetch.git
https://github.com/ninjudd/cake.git
https://github.com/heroku/pulse.git
https://github.com/Chouser/clojure-jna.git
https://github.com/tcr/mug.git
https://github.com/kumarshantanu/lein-exec.git
https://github.com/juxt/juxt-accounting.git
https://github.com/xsc/rewrite-clj.git
https://github.com/mmcgrana/fleetdb.git
https://github.com/gtrak/no.disassemble.git
https://github.com/TheClimateCorporation/claypoole.git
https://github.com/someben/skream.git
https://github.com/taylorlapeyre/oj.git
https://github.com/clojure/tools.logging.git
https://github.com/puppetlabs/puppetdb.git
https://github.com/elastic/stream2es.git
https://github.com/overtone/shadertone.git
https://github.com/ato/clojars-web.git
https://github.com/gigasquid/clj-drone.git
https://github.com/halgari/mjolnir.git
https://github.com/sgrove/omchaya.git
https://github.com/ptaoussanis/tower.git
https://github.com/ReadyForZero/babbage.git
https://github.com/ibdknox/crate.git
https://github.com/trptcolin/reply.git
https://github.com/danielsz/system.git
https://github.com/sjl/metrics-clojure.git
https://github.com/ibdknox/pinot.git
https://github.com/clojure/data.json.git
https://github.com/leonardoborges/bouncer.git
https://github.com/killme2008/defun.git
https://github.com/adamtornhill/code-maat.git
https://github.com/funcool/cats.git
https://github.com/zcaudate/vinyasa.git
https://github.com/jamii/strucjure.git
https://github.com/clojure-numerics/expresso.git
https://github.com/clojure-clutch/clutch.git
https://github.com/mattrepl/clj-oauth.git
https://github.com/ztellman/vertigo.git
https://github.com/cch1/http.async.client.git
https://github.com/xeqi/kerodon.git
https://github.com/miner/herbert.git
https://github.com/alanning/meteor-load-test.git
https://github.com/luminus-framework/luminus-template.git
https://github.com/dpapathanasiou/tweet-secret.git
https://github.com/duelinmarkers/clj-record.git
https://github.com/swannodette/hello-cljsc.git

@ieugen
Copy link

ieugen commented Apr 6, 2022

I have tried to implement the code using redis, to try it out and it works once.
Trying to understand where I am going wrong:

(ns ro.ieugen.training.async.pipeline
  "https://gist.github.com/JacobNinja/5c98496a632e1a466cbf"
  (:require [clojure.core.async :as async]
            [clj-http.client :as client]
            [clojure.data.json :as json]
            [taoensso.carmine :as car :refer (wcar)]))

(def concurrency 5)
(def server1-conn {:pool {}
                   :spec {:url "redis://localhost:5672/"}})
(defmacro wcar* [& body] `(car/wcar server1-conn ~@body))

(comment
  (wcar*
   (car/ping)
   (car/set "foo" "bar")
   (car/get "foo")
   (car/lpush "queue" "a" "b" "c" "d"))
  (wcar* (car/lpop "queue"))

  ; `in` can be backed by a redis queue
  (let [in (async/chan)
        out (async/chan)
        request-handler (fn [url out*]
                          (async/go
                            (println "Making request:" url)
                            (let [response (client/get url)
                                  body (json/read-str (:body response))]
                              (doseq [repo (get body "items")]
                                (async/>! out (repo "clone_url")))
                              (async/close! out*))
                            (println "Done!" url)))]
    (async/pipeline-async concurrency out request-handler in)
    ; `in` can be backed by a redis queue
    (async/go-loop []
      (if-let [message (wcar* (car/lpop "queue"))]
        (do
          (println "Got message from queue" message)
          (async/>! in message))
      ; Sleep if no messages available
        (do
          (println "sleeping")
          (async/<! (async/timeout 1000))))
      (recur))
    (async/go-loop
     []
      (println (async/<! out))
      (recur)))
  
  ;; send stuff to redis so they get processed.
  (doseq [url (for [page (range 10)]
                (str "https://api.github.com/search/repositories?q=language:clojure&page=" (inc page)))]
    (wcar* (car/lpush "queue" url)))

  0)

After the first processing, the out channel is closed.
That blocks any other future processing.
I can push to redis multiple times and I would like a continuous process.
Any idea what am I missing?

{:paths ["src"]
 :deps {clj-http/clj-http {:mvn/version "3.12.3"}
        org.clojure/data.json {:mvn/version "2.4.0"}
        org.clojure/clojure {:mvn/version "1.11.1"}
        org.clojure/core.async {:mvn/version "1.5.648"}
        com.taoensso/carmine {:mvn/version "3.1.0"}}
 :aliases
 ;; using :deps in aliases will replace deps. Use :extra-deps if you need all
 {:dev {:extra-deps {org.clojure/test.check {:mvn/version "1.1.1"}}}
  :build {:deps {io.github.clojure/tools.build {:git/tag "v0.8.1" :git/sha "7d40500"}}
          :ns-default build}
  :build-params {:deps {io.github.clojure/tools.build {:git/tag "v0.8.1" :git/sha "7d40500"}}
                 :ns-default build-params}}}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment