Skip to content

Instantly share code, notes, and snippets.

@viperscape
Created September 3, 2014 15:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save viperscape/f0e13ab6227e8fe1de4d to your computer and use it in GitHub Desktop.
Save viperscape/f0e13ab6227e8fe1de4d to your computer and use it in GitHub Desktop.
async chans and kuroshio streams
(ns chan-test.core
(:gen-class)
(:require [clojure.core.async :as async]
[kuroshio.core :as k]))
(defn create-string []
(apply str (take 10000 (repeat ".\r\n"))))
(defn println-err [& args]
(binding [*out* *err*]
(apply println args)))
(defn process
"Do 'work'"
[line]
(Thread/sleep 10)
line)
(defn reader-seq [s]
(line-seq (java.io.BufferedReader. (java.io.StringReader. s))))
(def in-chan (async/chan))
(def out-chan (async/chan))
(def wait-chan (async/chan))
(defn start-async-consumers
"Start num-consumers threads that will consume work
from the in-chan and put it into the out-chan."
[num-consumers]
(dotimes [_ num-consumers]
(async/thread
(loop []
(let [d (async/<!! in-chan)]
(when-not (= d ::quit)
(async/>!! out-chan (process d))
(recur)))))))
(defn start-async-aggregator
"Take items from the out-chan"
[]
(async/thread
(dotimes [n 10000]
(async/<!! out-chan))
(async/>!! wait-chan ::done)))
(defn async-test []
(start-async-consumers 8)
(start-async-aggregator)
(doseq [line (reader-seq (create-string))]
(async/>!! in-chan line))
(dotimes [n 8]
(async/>!! in-chan ::quit)) ;;shutdown workers
(async/<!! wait-chan))
(defn sync-test []
(doseq [line (reader-seq (create-string))]
(process line)))
(defn stream-test []
(let [work (k/new-stream)
results (k/new-stream)]
(dotimes [n 8] ;;setup workers
(future (loop []
(let [d (k/take! work)]
(when-not (= d ::quit)
(k/put! results (process d))
(recur))))))
(doseq [line (reader-seq (create-string))]
(k/put! work line))
(dotimes [n 8]
(k/put! work ::quit)) ;;shutdown workers
(dotimes [n 10000]
(k/take! results))))
(defn -main[& args]
;(time (sync-test)) ;;always 10x10000
(print "async-test ") (time (async-test)) ;; 12.5 sec
(print "stream-test ") (time (stream-test))) ;; 12.5 sec
(defproject chan-test "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.7.0-alpha1"]
[org.clojure/core.async "0.1.338.0-5c5012-alpha"]
[kuroshio "0.2.3-SNAPSHOT"]]
:main ^:skip-aot chan-test.core
:target-path "target/%s"
:profiles {:uberjar {:aot :all}})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment