Skip to content

Instantly share code, notes, and snippets.

@pdkovacs
Last active August 20, 2017 16:15
Show Gist options
  • Save pdkovacs/1876954cd8e2016440ca07cb6b1d860e to your computer and use it in GitHub Desktop.
Save pdkovacs/1876954cd8e2016440ca07cb6b1d860e to your computer and use it in GitHub Desktop.
Exploring the first example in "Core.Async in Use - Timothy Baldridge / Clojure West 2017"
(ns org.bitkitchen.explore.async.batch
(:require [clojure.core.async
:as a
:refer [>! <! >!! <!! go chan close! pipeline]]))
;;;::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
;;; TEST TARGETS
;;; Core.Async in Use - Timothy Baldridge / Clojure West 2017
;;;::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
;;; The initial test target
(defn batch-0
[in out max-size]
(go
(loop [acc nil]
(let [v (<! in)]
(if (nil? v)
(close! out)
(let [new-acc (concat acc v)]
(if (>= (count new-acc) max-size)
(when (>! out new-acc)
(recur nil))
(recur new-acc))))))))
;;; An optimized test target
(defn batch-optimized
[in out max-size]
(let [xf (comp cat (partition-all max-size))]
(pipeline 1 out xf in)))
;;;::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
;;; OMG, the behaviour of the initial test target and the "improved" test
;;; target is completely different. Already by the look of it, I suspected this
;;; was a con man's talk. :-(
;;;::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
;;;::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
;;; Functions supporting the test execution
;;;::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
(defn make-readable
"Makes the consumed data more readable"
[coll]
(map (fn [x] (str x ",")) coll))
(defn notify-on-completion
"Used by the test-consumer thread to notify of the closure of the channel to be consumed"
[channel-to-notify]
(>!! channel-to-notify "test consumer completed"))
(defn start-test-consumer
[channel-to-consume, channel-to-notify-on-completion]
(go
(loop []
(let [v (<! channel-to-consume)]
(if (nil? v)
(notify-on-completion channel-to-notify-on-completion)
(do
(println ">>>>>>>>> external consumer:: consumed size:" (count v) ", consumed data:" (apply str (make-readable v)))
(recur)))))))
(defn start-feeding-test-input
[channel-to-feed, test-input, max-input-quantum, batch-size, batch-count]
(loop [total-transmit-count 0]
(let [input-quantum (inc (rand-int max-input-quantum))
next-transmit-count input-quantum
data (take next-transmit-count (drop total-transmit-count test-input))]
;(println "input-quantum: " input-quantum)
(>!! channel-to-feed data)
(let [new-total-transmit-count (+ total-transmit-count next-transmit-count)]
(if (> new-total-transmit-count (* batch-size batch-count))
(close! channel-to-feed)
(recur new-total-transmit-count))))))
(defn wait-for-test-consumer-completion [channel-to-wait-on]
(<!! channel-to-wait-on))
;;;::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
;;; Test execution
;;;::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
(def batch-size 32)
(def batch-count 2)
(def max-input-quantum 3)
(def test-input (range 2000))
(def completion-notification-channel (chan))
(def in (chan 1))
(def out (chan))
(start-test-consumer out completion-notification-channel)
(batch-0 in out batch-size)
;(batch-optimized in out batch-size)
(start-feeding-test-input in, test-input, max-input-quantum, batch-size, batch-count)
(wait-for-test-consumer-completion completion-notification-channel)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment