Web crawler example from the book "Clojure Programming"; the book's code has been modified with the imports (missing from book) needed to compile correctly, BUT the results of crawling look wrong, it seems only one URL is processed per agent(s), and then the agent(s) stops for the rest of the runtime.
(require '[net.cgrand.enlive-html :as enlive]) ; This line requires enlive 1.0.0
(use '[clojure.string :only (lower-case)])
(import '( URL MalformedURLException))
;; The two imports below were not in the book, but essential for
;; subsequent code to work:
(use '[ :only (as-url)])
(import [java.util.concurrent BlockingQueue LinkedBlockingQueue])
;; Page 218:
(defn- links-from [base-url html]
(remove nil? (for [link (enlive/select html [:a])]
(when-let [href (-> link :attrs :href)]
(URL. base-url href)
(catch MalformedURLException e))))))
(defn- words-from [html]
(let [chunks (-> html
(enlive/at [:script] nil)
(enlive/select [:body enlive/text-node]))]
(->> chunks
(mapcat (partial re-seq #"\w+"))
(remove (partial re-matches #"\d+"))
(map lower-case))))
;; Page 219:
(def url-queue (LinkedBlockingQueue.)) ; Java thread-safe queue
(def crawled-urls (atom #{})) ; set
(def word-freqs (atom {})) ; map
;; Page 220:
(declare get-url)
(def agents (set (repeatedly 25 #(agent {::t #'get-url :queue url-queue}))))
(declare run process handle-results)
;; Below, our three agent actions: get-url, process,
;; handle-results. Note common structure using try.
;; Note below that *agent* is implicitly bound by Clojure to the
;; current agent; calling run on it queues the next transition,
;; based on ::t state.
(defn ^::blocking get-url [{:keys [^BlockingQueue queue] :as state}]
(let [url (as-url (.take queue))]
(if (@crawled-urls url)
state ; Already crawled, no change.
{:url url ; Otherwise, process new URL
:content (slurp url)
::t #'process})
(catch Exception e state) ; Skip any failing URL.
(finally (run *agent*)))))
;; Page 221:
(defn process [{:keys [url content]}]
(let [html (enlive/html-resource ( content))]
{::t #'handle-results
:url url
:links (links-from url html)
:words (reduce (fn [m word]
update-in m [word] (fnil inc 0))
(words-from html))})
(finally (run *agent*))))
(defn ^::blocking handle-results [{:keys [url links words]}]
;; Update our three key states: crawled-urls, url-queue,
;; word-freqs
(swap! crawled-urls conj url)
(doseq [url links]
(.put url-queue url))
(swap! word-freqs (partial merge-with +) words)
{::t #'get-url :queue url-queue}
(finally (run *agent*))))
;; Page 222:
(defn paused? [agent] (::paused (meta agent)))
(defn run
;; Two signatures, 0 and 1 arities:
([] (doseq [a agents] (run a)))
(when (agents a)
(send a (fn [{transition ::t :as state}]
(when-not (paused? *agent*)
(let [dispatch-fn (if (-> transition meta ::blocking)
(dispatch-fn *agent* transition)))
;; Pause or restart just from using metadata:
(defn pause
([] (doseq [a agents] (pause a)))
([a] (alter-meta! a assoc ::paused true)))
(defn restart
([] (doseq [a agents] (restart a)))
(alter-meta! a dissoc ::paused)
(run a)))
(defn test-crawler
"Resets all state associated with the crawler, adds the given URL to the
url-queue, and runs the crawler for 60 seconds, returning a vector
containing the number of URLs crawled, and the number of URLs
accumulated through crawling that have yet to be visited."
[agent-count starting-url]
(def agents (set (repeatedly agent-count
#(agent {::t #'get-url :queue url-queue}))))
(.clear url-queue)
(swap! crawled-urls empty)
(swap! word-freqs empty)
(.add url-queue starting-url)
(Thread/sleep 10000) ; Just 10s is enough to make the point.
[(count @crawled-urls) (count url-queue)])
;; Testing
(test-crawler 1 "") ;=> [1 268], not quite right
(test-crawler 20 "") ;=> [21 1990], also not right
line 63 : should be (update-in m [word] (fnil inc 0)))

