Skip to content

Instantly share code, notes, and snippets.

@halgari
Created August 22, 2013 16:24
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save halgari/6309500 to your computer and use it in GitHub Desktop.
Save halgari/6309500 to your computer and use it in GitHub Desktop.
Load balancer using core.async
(ns async-examples.load-balancer
(:require [clojure.core.async :refer :all]))
;; Let's assume we have a DB, and it holds the following pairs of name/ages
(def db-store
{:john 42
:amy 33
:jill 3
:james 4})
;; Now let's assume that we have two operations with a operand on one
;; operation
(defn query [op & args]
(case op
:get-people (keys db-store)
:get-age (db-store (first args))))
;; Most DBs have a connection limit, so let's simulate that
(def MAX-CONNECTIONS 4)
(def access-count (atom 0))
(defn db-operation [& args]
(swap! access-count inc)
(assert (<= @access-count MAX-CONNECTIONS))
(let [result (apply query args)]
(Thread/sleep 100) ;; simulates a slower db
(swap! access-count dec)
result))
;; So what we want to do, is some connection pooling / load balancing. This is normally
;; pretty hard, but with core.async it's rather trivial:
(def db-chan (let [c (chan MAX-CONNECTIONS)]
(dotimes [x MAX-CONNECTIONS]
(thread
(while true
(let [[command ret-chan] (<!! c)]
(>!! ret-chan (apply db-operation command))))))
c))
;; Now lets' write a helper function that will make querying the DB a
;; bit simpler from inside a async blog
(defn async-query [& operations]
(let [c (chan 1)]
(go (>! db-chan [operations c]))
c))
;; Now we have everything we need to create 100 gos and do some
;; queries. Notice how the 100 threads are balanced over the
;; MAX-CONNECTIONS db threads. In this way we don't overload the DB
;; server, while still taking advantage of the large number of go threads.
(def println-chan (let [c (chan MAX-CONNECTIONS)]
(thread
(while true
(println (<!! c))))
c))
(defn join!!
"takes a sequence of gos waits until all gos complete"
[args]
(doseq [s (doall args)]
(<!! s)))
(defn -main []
(join!! (for [x (range 100)]
(go
(let [people (<! (async-query :get-people))]
(>! println-chan people)
(doseq [person people]
(>! println-chan [person (<! (async-query :get-age person))])))))))
@cyppan
Copy link

cyppan commented May 31, 2016

Thanks for sharing, here an alternative implementation of the same behavior using core.async/pipeline-blocking
Your approach is more straightforward when you need a reply for your asynchronous command. (my result chan proxies to the sender to achieve similar behavior)

(ns playground
  (:require [clojure.core.async :refer :all]))


;; Let's assume we have a DB, and it holds the following pairs of name/ages

(def db-store
  {:john  42
   :amy   33
   :jill  3
   :james 4})

;; Now let's assume that we have two operations with a operand on one
;; operation

(defn query [op & args]
  (case op
    :get-people (keys db-store)
    :get-age (db-store (first args))))

;; Most DBs have a connection limit, so let's simulate that

(def MAX-CONNECTIONS 4)
(def access-count (atom 0))

(defn db-operation [[sender args]]
  (swap! access-count inc)
  (assert (<= @access-count MAX-CONNECTIONS))
  (let [result (apply query args)]
    (Thread/sleep 1000) ;; simulates a slower db
    (swap! access-count dec)
    [sender result]))

(def command-chan (chan MAX-CONNECTIONS))
(def result-chan (let [c (chan MAX-CONNECTIONS)]
                   (go-loop []
                     ;; Sending back result to the sender specified when calling the command
                     (let [[sender result] (<! c)]
                       (>! sender result))
                     (recur))
                   c))

(defn async-query
  "execute queries asynchronously and push the result in the returned channel"
  [& args]
  (let [c (chan)]
    (go (>! command-chan [c args]))
    c))

;; avoid logs to overlap
(def println-chan (let [c (chan MAX-CONNECTIONS)]
                    (go-loop []
                      (println (<! c))
                      (recur))
                    c))

(defn -main []
  ;; The pipeline blocking func uses the map transducer in parallel (capped at MAX-CONNECTIONS)
  ;; and transmit the result to the result-chan
  (pipeline-blocking MAX-CONNECTIONS result-chan (clojure.core/map db-operation) command-chan)
  (dotimes [n 10]
    (go
      (let [people (<! (async-query :get-people))]
        (>! println-chan people)
        (doseq [person people]
          (>! println-chan [person (<! (async-query :get-age person))]))))))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment