Skip to content

Instantly share code, notes, and snippets.

Last active May 8, 2016 12:40
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
Using core.async for Producer-consumer Systems

Using core.async for Producer-consumer Systems

# Set up input file
repeat 10000 echo "." >> input

# Run inline
time lein run inline < input > output

# Run async
time lein run inline < input > output

Copyright © 2014 Elben Shira

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

(ns test-speed.core
(:require [clojure.core.async :as async]))
(defn println-err [& args]
(binding [*out* *err*]
(apply println args)))
(defn process
"Do 'work'"
(Thread/sleep 10)
(def stdin-reader
( *in*))
(def in-chan (async/chan))
(def out-chan (async/chan))
(defn start-async-consumers
"Start num-consumers threads that will consume work
from the in-chan and put it into the out-chan."
(dotimes [_ num-consumers]
(while true
(let [line (async/<!! in-chan)
data (process line)]
(async/>!! out-chan data))))))
(defn start-async-aggregator
"Take items from the out-chan and print it."
(while true
(let [data (async/<!! out-chan)]
(println data)))))
(defn -main
[& args]
(let [[mode num-times] args]
(println-err args)
(case mode
(start-async-consumers 8)
(doseq [line (line-seq stdin-reader)]
(async/>!! in-chan line)))
;; Read each line from stdin
(doseq [line (line-seq stdin-reader)]
(let [data (process line)]
(println data))))))
(defproject test-speed "1.0.0"
:description "core.async producer-consumer workflow"
:url ""
:license {:name "Eclipse Public License"
:url ""}
:dependencies [[org.clojure/clojure "1.7.0-alpha1"]
[org.clojure/core.async "0.1.338.0-5c5012-alpha"]]
:main ^:skip-aot test-speed.core
:target-path "target/%s"
:profiles {:uberjar {:aot :all}})
Copy link

(defn create-input-file []
  (spit "input"
        (apply str (take 10000 (repeat ".\r\n")))))

Copy link

Had some fun altering this a bit so it runs within clojure without using the terminal; overall it's similar. It also has a 3rd choice with streams:

Copy link

elben commented Oct 9, 2014

Timing it in Clojure seems like a better idea than my use of time :)

Copy link

Things I learned getting this code to run locally:

  1. lein new test-speed is easiest way to drop in someone else's project.clj & core.clj (match their namespace, drop the core & project .clj's on top of the templated ones)
  2. zsh has a 'repeat' shell builtin
  3. Elben's Macbook Air is 10x faster than mine :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment