Created
September 19, 2019 21:41
-
-
Save joinr/3cca221e2df37550e0f334371a03baba to your computer and use it in GitHub Desktop.
A simple core.async example of splitting up processing flows using pub/sub and pipeline-blocking
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns workdist | |
(:require [clojure.core.async :as a :refer [chan >!! <!! <! >! go go-loop]])) | |
;;define some simple workloads | |
(def payloads | |
{:red (vec "red") | |
:green (vec "green") | |
:blue (vec "blue")}) | |
;;Generate 100 messages of :red :green :blue categories | |
(def messages | |
(->> (cycle [:red :green :blue]) | |
(map (fn [x] {:category x :data (payloads x)})) | |
(take 100))) | |
;;A central intake channel for jobs. | |
(def <jobs> (a/chan 1024)) | |
;;Define a publication where we split inputs into | |
;;topics based on :category. | |
(def job-pub (a/pub <jobs> :category)) | |
;;Convenience function to subscribe a channel | |
;;to the job board by topic. Creates, subs, | |
;and returns the channel. | |
(defn sub-job! [<in> topic] | |
(do (a/sub job-pub topic <in>) | |
<in>)) | |
;;Simple work function, for use in our transducer later. | |
(defn process-data [{:keys [category data]}] | |
(str (apply str data) "-processed")) | |
;;Subscribe a bunch of interested channels to each independent | |
;;category. | |
(def consumers {:red (sub-job! (a/chan 100) :red) | |
:green (sub-job! (a/chan 100) :green) | |
:blue (sub-job! (a/chan 100) :blue)}) | |
;;define a output channels for each category to publish results | |
;;on. | |
(def outs {:red (a/chan 100) | |
:green (a/chan 100) | |
:blue (a/chan 100)}) | |
;;define blocking workers that will pull topics | |
;;and synchronously process them, pushing results onto outs. | |
;;We can ramp up the parallelism if we increment from 1 to n, | |
;;e.g. where n workers will pull from a category and process | |
;;synchronously, and push values into the output in order of | |
;;input received. | |
(doseq [k (keys outs)] | |
(a/pipeline-blocking 1 (outs k) (map process-data) (consumers k))) | |
;;log a bunch of information by polling all the producer's | |
;;output channels as information becomes available. | |
(go-loop [] | |
(let [[v c] (a/alts! (vals outs))] | |
(println [:processed v]) | |
(recur))) | |
;;push all the stuff onto our jobs publication, | |
;;which will distribute it to subscribed parties, | |
;;which will then get pipelined into the blocking (dependent/synchronous) | |
;;workflows, which will get picked up by our logging go-loop | |
;;and print out all the processing messages. | |
;;The <jobs> channel will remain open. | |
(a/onto-chan <jobs> messages nil) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment