Skip to content

Instantly share code, notes, and snippets.

@elben
Last active May 8, 2016 12:40
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save elben/ef9c8b96e476e5606b95 to your computer and use it in GitHub Desktop.
Save elben/ef9c8b96e476e5606b95 to your computer and use it in GitHub Desktop.
Using core.async for Producer-consumer Systems

Using core.async for Producer-consumer Systems

# Set up input file
repeat 10000 echo "." >> input

# Run inline
time lein run inline < input > output

# Run async
time lein run inline < input > output

Copyright © 2014 Elben Shira

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

(ns test-speed.core
(:gen-class)
(:require [clojure.core.async :as async]))
(defn println-err [& args]
(binding [*out* *err*]
(apply println args)))
(defn process
"Do 'work'"
[line]
(Thread/sleep 10)
line)
(def stdin-reader
(java.io.BufferedReader. *in*))
(def in-chan (async/chan))
(def out-chan (async/chan))
(defn start-async-consumers
"Start num-consumers threads that will consume work
from the in-chan and put it into the out-chan."
[num-consumers]
(dotimes [_ num-consumers]
(async/thread
(while true
(let [line (async/<!! in-chan)
data (process line)]
(async/>!! out-chan data))))))
(defn start-async-aggregator
"Take items from the out-chan and print it."
[]
(async/thread
(while true
(let [data (async/<!! out-chan)]
(println data)))))
(defn -main
""
[& args]
(let [[mode num-times] args]
(println-err args)
(case mode
"async"
(do
(start-async-consumers 8)
(start-async-aggregator)
(doseq [line (line-seq stdin-reader)]
(async/>!! in-chan line)))
"inline"
;; Read each line from stdin
(doseq [line (line-seq stdin-reader)]
(let [data (process line)]
(println data))))))
(defproject test-speed "1.0.0"
:description "core.async producer-consumer workflow"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://elbenshira.com/blog/using-core-async-for-producer-consumer-workflows/"}
:dependencies [[org.clojure/clojure "1.7.0-alpha1"]
[org.clojure/core.async "0.1.338.0-5c5012-alpha"]]
:main ^:skip-aot test-speed.core
:target-path "target/%s"
:profiles {:uberjar {:aot :all}})
@viperscape
Copy link

(defn create-input-file []
  (spit "input"
        (apply str (take 10000 (repeat ".\r\n")))))

@viperscape
Copy link

Had some fun altering this a bit so it runs within clojure without using the terminal; overall it's similar. It also has a 3rd choice with streams: https://gist.github.com/viperscape/f0e13ab6227e8fe1de4d

@elben
Copy link
Author

elben commented Oct 9, 2014

Timing it in Clojure seems like a better idea than my use of time :)

@ralph-tice
Copy link

Things I learned getting this code to run locally:

  1. lein new test-speed is easiest way to drop in someone else's project.clj & core.clj (match their namespace, drop the core & project .clj's on top of the templated ones)
  2. zsh has a 'repeat' shell builtin
  3. Elben's Macbook Air is 10x faster than mine :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment