Skip to content

Instantly share code, notes, and snippets.

Created January 6, 2011 18:27
Show Gist options
  • Save drone29a/768301 to your computer and use it in GitHub Desktop.
Save drone29a/768301 to your computer and use it in GitHub Desktop.
(import java.util.concurrent.Executors)
(use '[plumbing.core :only [wait-until with-log]])
(require '[work.core :as work]
'[fetcher.core :as fetcher])
(def test-feed-urls
["" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" ",7&mode=rss" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "" "atom" "" "" "" ""])
(defn fu-sync
[producer num-threads]
(fn [] (let [finished (atom [])
p (Executors/newFixedThreadPool num-threads)
input (take-while #(not (nil? %)) (repeatedly producer))]
(doseq [feed input]
(let [^Runnable fx (fn []
(slurp (:url feed))
(swap! finished conj (:url feed)))]
(.submit p fx)))
{:p p :f finished})))
(defn queue-input
(let [xs (atom (take count (cycle test-feed-urls)))]
(fn []
(let [x (first @xs)]
(swap! xs rest)
(if x {:key x :url x}
(defn fu-async
(fn [] (let [finished (atom [])
work-q (work/queue-work {:f (with-log :error fetcher/fetch)
:in producer
:out (fn [x] (swap! finished conj (:url x)))
:threads (work/available-processors)
:exec work/async})]
{:p work-q :f finished})))
(defn time-reqs
[f num secs]
(let [{finished :f
pool :p} (f)]
(time (wait-until #(>= (count @finished) num) secs))
(.shutdown pool)
(println (format "Fetched %d of %d URLs." (count @finished) num))))
;;; Example
;;; ;; Asynchronously request 1000 URLs, see how long it takes to get 900 responses,
;;; ;; but don't wait longer than 60 seconds.
;;; (time-reqs (fu-async (queue-input 1000)) 900 60)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment