-
-
Save tdantas/4d172af3451054290dc67541f5f71d60 to your computer and use it in GitHub Desktop.
producer->consumers consumers->producer channel communication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns my.shame | |
(:require [clojure.core.async :as async :refer [timeout go-loop close! chan sliding-buffer tap mult go >! <! alts!]])) | |
(defn producer [balance] | |
(let [producer->worker (chan (sliding-buffer 1)) | |
multiplexer (mult producer->worker) | |
worker->producer (chan 1) | |
stop (chan 1)] | |
(go-loop [current-balance balance | |
wait 1000] | |
(when-let [[value channel] (alts! [stop worker->producer (timeout wait)])] | |
(cond | |
(= channel stop) (println "Shutdown producer ... ") | |
(= channel worker->producer) (do | |
(println (str "Producer received back from workers")) | |
(>! producer->worker current-balance) | |
(recur (+' value current-balance) wait)) | |
:fallback (do | |
(>! producer->worker current-balance) | |
(recur current-balance wait))))) | |
[multiplexer worker->producer stop])) | |
(defn worker [producer->worker-ch worker->producer-ch] | |
(go-loop [] | |
(<! (timeout 3000)) ; slowing down the workers | |
(>! worker->producer-ch (inc (<! producer->worker-ch))) | |
(recur))) | |
(defn app [n-workers initial-balance] | |
(let [[producer->worker-multiplexer worker->producer stop-channel] (producer initial-balance)] | |
(doseq [i (range 0 n-workers)] | |
(let [producer->worker (chan 1)] | |
(tap producer->worker-multiplexer producer->worker) | |
(worker producer->worker worker->producer))) | |
stop-channel)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment