Skip to content

Instantly share code, notes, and snippets.

@joinr
Created September 19, 2019 21:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joinr/3cca221e2df37550e0f334371a03baba to your computer and use it in GitHub Desktop.
Save joinr/3cca221e2df37550e0f334371a03baba to your computer and use it in GitHub Desktop.
A simple core.async example of splitting up processing flows using pub/sub and pipeline-blocking
(ns workdist
(:require [clojure.core.async :as a :refer [chan >!! <!! <! >! go go-loop]]))
;;define some simple workloads
(def payloads
{:red (vec "red")
:green (vec "green")
:blue (vec "blue")})
;;Generate 100 messages of :red :green :blue categories
(def messages
(->> (cycle [:red :green :blue])
(map (fn [x] {:category x :data (payloads x)}))
(take 100)))
;;A central intake channel for jobs.
(def <jobs> (a/chan 1024))
;;Define a publication where we split inputs into
;;topics based on :category.
(def job-pub (a/pub <jobs> :category))
;;Convenience function to subscribe a channel
;;to the job board by topic. Creates, subs,
;and returns the channel.
(defn sub-job! [<in> topic]
(do (a/sub job-pub topic <in>)
<in>))
;;Simple work function, for use in our transducer later.
(defn process-data [{:keys [category data]}]
(str (apply str data) "-processed"))
;;Subscribe a bunch of interested channels to each independent
;;category.
(def consumers {:red (sub-job! (a/chan 100) :red)
:green (sub-job! (a/chan 100) :green)
:blue (sub-job! (a/chan 100) :blue)})
;;define a output channels for each category to publish results
;;on.
(def outs {:red (a/chan 100)
:green (a/chan 100)
:blue (a/chan 100)})
;;define blocking workers that will pull topics
;;and synchronously process them, pushing results onto outs.
;;We can ramp up the parallelism if we increment from 1 to n,
;;e.g. where n workers will pull from a category and process
;;synchronously, and push values into the output in order of
;;input received.
(doseq [k (keys outs)]
(a/pipeline-blocking 1 (outs k) (map process-data) (consumers k)))
;;log a bunch of information by polling all the producer's
;;output channels as information becomes available.
(go-loop []
(let [[v c] (a/alts! (vals outs))]
(println [:processed v])
(recur)))
;;push all the stuff onto our jobs publication,
;;which will distribute it to subscribed parties,
;;which will then get pipelined into the blocking (dependent/synchronous)
;;workflows, which will get picked up by our logging go-loop
;;and print out all the processing messages.
;;The <jobs> channel will remain open.
(a/onto-chan <jobs> messages nil)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment