Skip to content

Instantly share code, notes, and snippets.

@minimal
Created November 17, 2009 00:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minimal/236492 to your computer and use it in GitHub Desktop.
Save minimal/236492 to your computer and use it in GitHub Desktop.
;; Discussed in detail at http://technomancy.us/130
(ns wide-finder
"A basic map/reduce approach to the wide finder using agents.
Optimized for being idiomatic and readable rather than speed."
(:use [clojure.contrib.duck-streams :only [reader]]))
(def re #"GET /(\d+)")
(defn inc-or-init [i]
(if i (inc i) 1))
(defn count-line
"Increment the relevant entry in the counts map."
[counts line]
(if-let [[_ hit] (re-find re line)]
(update-in counts [hit] inc-or-init)
counts))
(defn count-lines
"count a bunch of lines"
[counts lines]
(reduce count-line counts lines))
(defn find-widely
"Return a map of pages to hit counts in filename."
[filename n]
;; each agent begins as an empty map.
(let [agents (map agent (repeat n {}))]
;; For each line in the file, send an agent the job of counting it.
(dorun (map #(send %1 count-line %2)
(cycle agents) ; infinite seq of all agents
(line-seq (reader filename))))
;; Wait for each agent to finish.
(doseq [a agents] (await a))
;; Reduce the results into a single count value.
(apply merge-with + (map deref agents))))
(defn find-widely-chunked
"Return a map of pages to hit counts in filename."
[filename n chunksize]
;; each agent begins as an empty map.
(let [agents (map agent (repeat n {}))]
;; For each line in the file, send an agent the job of counting it.
(dorun (map #(send %1 count-lines %2)
(cycle agents) ; infinite seq of all agents
(partition chunksize (line-seq (reader filename)))))
;; Wait for each agent to finish.
(doseq [a agents] (await a))
;; Reduce the results into a single count value.
(apply merge-with + (map deref agents))))
(defn find-widely-single
"Non parallel version"
[filename]
(reduce count-line {} (line-seq (reader filename))))
(defn testall
[]
(do
[(println "parallel 2 core" (time (find-widely "n:/tmp/log.txt" 2)))
(println "parallel 4 cores" (time (find-widely "n:/tmp/log.txt" 4)))
(println "non parallel" (time (find-widely-single "n:/tmp/log.txt")))]))
(testall)
(println "chunked parallel" (time (find-widely-chunked "n:/tmp/log2.txt" 4 50)))
(println "non parallel" (time (find-widely-single "n:/tmp/log2.txt")))
;; time: 915.114642 msecs"
;; non parallel {1 16681, 2 16677, 3 16710, 4 16332, 5 16581, 6 16693, 7 16575, 8 16746, 9 16741, 10 16525}
;; "Elapsed time: 3214.297911 msecs"
;; parallel {1 16681, 2 16677, 3 16710, 4 16332, 5 16581, 6 16693, 7 16575, 8 16746, 9 16741, 10 16525}
;; "Elapsed time: 4546.566562 msecs"
;; parallel 2 core {1 16681, 2 16677, 3 16710, 4 16332, 5 16581, 6 16693, 7 16575, 8 16746, 9 16741, 10 16525}
;; "Elapsed time: 3575.291225 msecs"
;; parallel 4 cores {1 16681, 2 16677, 3 16710, 4 16332, 5 16581, 6 16693, 7 16575, 8 16746, 9 16741, 10 16525}
;; "Elapsed time: 909.037236 msecs"
;; non parallel {1 16681, 2 16677, 3 16710, 4 16332, 5 16581, 6 16693, 7 16575, 8 16746, 9 16741, 10 16525}
;; "Elapsed time: 540.495585 msecs"
;; chunked parallel 4 cores {1 16681, 2 16677, 3 16710, 4 16332, 5 16581, 6 16693, 7 16575, 8 16746, 9 16741, 10 16525}
;; "Elapsed time: 5421.381623 msecs"
;; 40MB log chunked parallel {1 167009, 2 167039, 3 166517, 4 166548, 5 166321, 6 166705, 7 166552, 8 166615, 9 166281, 10 167314}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment