Last active
August 20, 2017 16:15
-
-
Save pdkovacs/1876954cd8e2016440ca07cb6b1d860e to your computer and use it in GitHub Desktop.
Exploring the first example in "Core.Async in Use - Timothy Baldridge / Clojure West 2017"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns org.bitkitchen.explore.async.batch | |
(:require [clojure.core.async | |
:as a | |
:refer [>! <! >!! <!! go chan close! pipeline]])) | |
;;;:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: | |
;;; TEST TARGETS | |
;;; Core.Async in Use - Timothy Baldridge / Clojure West 2017 | |
;;;:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: | |
;;; The initial test target | |
(defn batch-0 | |
[in out max-size] | |
(go | |
(loop [acc nil] | |
(let [v (<! in)] | |
(if (nil? v) | |
(close! out) | |
(let [new-acc (concat acc v)] | |
(if (>= (count new-acc) max-size) | |
(when (>! out new-acc) | |
(recur nil)) | |
(recur new-acc)))))))) | |
;;; An optimized test target | |
(defn batch-optimized | |
[in out max-size] | |
(let [xf (comp cat (partition-all max-size))] | |
(pipeline 1 out xf in))) | |
;;;:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: | |
;;; OMG, the behaviour of the initial test target and the "improved" test | |
;;; target is completely different. Already by the look of it, I suspected this | |
;;; was a con man's talk. :-( | |
;;;:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: | |
;;;:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: | |
;;; Functions supporting the test execution | |
;;;:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: | |
(defn make-readable | |
"Makes the consumed data more readable" | |
[coll] | |
(map (fn [x] (str x ",")) coll)) | |
(defn notify-on-completion | |
"Used by the test-consumer thread to notify of the closure of the channel to be consumed" | |
[channel-to-notify] | |
(>!! channel-to-notify "test consumer completed")) | |
(defn start-test-consumer | |
[channel-to-consume, channel-to-notify-on-completion] | |
(go | |
(loop [] | |
(let [v (<! channel-to-consume)] | |
(if (nil? v) | |
(notify-on-completion channel-to-notify-on-completion) | |
(do | |
(println ">>>>>>>>> external consumer:: consumed size:" (count v) ", consumed data:" (apply str (make-readable v))) | |
(recur))))))) | |
(defn start-feeding-test-input | |
[channel-to-feed, test-input, max-input-quantum, batch-size, batch-count] | |
(loop [total-transmit-count 0] | |
(let [input-quantum (inc (rand-int max-input-quantum)) | |
next-transmit-count input-quantum | |
data (take next-transmit-count (drop total-transmit-count test-input))] | |
;(println "input-quantum: " input-quantum) | |
(>!! channel-to-feed data) | |
(let [new-total-transmit-count (+ total-transmit-count next-transmit-count)] | |
(if (> new-total-transmit-count (* batch-size batch-count)) | |
(close! channel-to-feed) | |
(recur new-total-transmit-count)))))) | |
(defn wait-for-test-consumer-completion [channel-to-wait-on] | |
(<!! channel-to-wait-on)) | |
;;;:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: | |
;;; Test execution | |
;;;:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: | |
(def batch-size 32) | |
(def batch-count 2) | |
(def max-input-quantum 3) | |
(def test-input (range 2000)) | |
(def completion-notification-channel (chan)) | |
(def in (chan 1)) | |
(def out (chan)) | |
(start-test-consumer out completion-notification-channel) | |
(batch-0 in out batch-size) | |
;(batch-optimized in out batch-size) | |
(start-feeding-test-input in, test-input, max-input-quantum, batch-size, batch-count) | |
(wait-for-test-consumer-completion completion-notification-channel) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment