Skip to content

Instantly share code, notes, and snippets.

@tdantas
Created January 28, 2018 16:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tdantas/4d172af3451054290dc67541f5f71d60 to your computer and use it in GitHub Desktop.
Save tdantas/4d172af3451054290dc67541f5f71d60 to your computer and use it in GitHub Desktop.
producer->consumers consumers->producer channel communication
(ns my.shame
(:require [clojure.core.async :as async :refer [timeout go-loop close! chan sliding-buffer tap mult go >! <! alts!]]))
(defn producer [balance]
(let [producer->worker (chan (sliding-buffer 1))
multiplexer (mult producer->worker)
worker->producer (chan 1)
stop (chan 1)]
(go-loop [current-balance balance
wait 1000]
(when-let [[value channel] (alts! [stop worker->producer (timeout wait)])]
(cond
(= channel stop) (println "Shutdown producer ... ")
(= channel worker->producer) (do
(println (str "Producer received back from workers"))
(>! producer->worker current-balance)
(recur (+' value current-balance) wait))
:fallback (do
(>! producer->worker current-balance)
(recur current-balance wait)))))
[multiplexer worker->producer stop]))
(defn worker [producer->worker-ch worker->producer-ch]
(go-loop []
(<! (timeout 3000)) ; slowing down the workers
(>! worker->producer-ch (inc (<! producer->worker-ch)))
(recur)))
(defn app [n-workers initial-balance]
(let [[producer->worker-multiplexer worker->producer stop-channel] (producer initial-balance)]
(doseq [i (range 0 n-workers)]
(let [producer->worker (chan 1)]
(tap producer->worker-multiplexer producer->worker)
(worker producer->worker worker->producer)))
stop-channel))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment