Skip to content

Instantly share code, notes, and snippets.

@mad
Created April 25, 2011 18:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mad/940932 to your computer and use it in GitHub Desktop.
Save mad/940932 to your computer and use it in GitHub Desktop.
(ns wf
(:require (clojure.java.io/writer))
(:use [clojure.contrib.seq :only (indexed)])
(:import java.util.concurrent.atomic.AtomicLong))
(set! *warn-on-reflection* true)
;;; Big wins:
;;; * Use queue instead of pmap.
;;; * Overlap chunking with actual runtime
;;;; Reading
(defn chunk-file [filename n]
(println "chunking...")
(let [file (java.io.RandomAccessFile. filename "r")]
(let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))]
(do (when-not (zero? offset)
(.seek file offset)
(while (not= (.read file) (int \newline))))
(.getFilePointer file)))
offsets (concat offsets [(.length file)])]
(partition 2 (interleave offsets (rest offsets))))))
;;; Writing
(comment
(defn write-line [file line]
(let [ary (make-array String 6)
fields (dumbest-split "a\tb\tc\tb" \tab ary)]
(map fields))
)
)
(defn dolines [#^String file start-byte end-byte f]
(with-open [stream (java.io.FileInputStream. file)]
(.skip stream start-byte)
(let [rdr (java.io.BufferedReader.
(java.io.InputStreamReader.
(java.io.BufferedInputStream. stream
(* 8 131072))
"US-ASCII") 131072)]
(loop [#^String line (.readLine rdr)
remaining (- end-byte start-byte)]
(when (and (>= remaining 0) line)
(f line)
(recur (.readLine rdr)
(- remaining (.length line))))))))
(defn #^"[Ljava.lang.String;" dumbest-split
[#^String s c #^"[Ljava.lang.String;" tokens]
(let [len (dec (int (alength tokens)))]
(loop [start (int 0)
i (int 0)]
(let [idx (int (.indexOf s (int c) (int start)))]
(if (or (neg? idx) (>= i len))
(do (aset tokens i (.substring s start))
tokens)
(do (aset tokens i (.substring s start idx))
(recur (inc idx) (inc i))))))))
(def template
{0 :datetime
1 :recordtype
2 :msisn
3 :imei
4 :otherparty
5 :typenumber
6 :callduration
7 :cellid})
(comment
;;
(let
[real ["25 Aug 2003 09:02:32" "MTC" "78652964404" "3511034004353504" "" "" "50" "11972"]]
(apply str (interpose "; " (map #(get %1 1)
(map-indexed
(fn [index value]
(let [type (get template index)]
[type value])) real)))))
)
(defn parse-line [line ary fo]
(let [fields (dumbest-split line \t ary)
line (apply str (interpose "; "
(first (map #(get %1 1)
(map-indexed
(fn [index value]
(let [type (get template index)]
[type value])) fields)))))]
(println (type fields))
(.writeBytes fo (concat line "\r\n"))))
;; (when (and (= (aget fields 5) "\"GET")
;; ('#{"200" "304" "404"} status))
;; {:client (aget fields 0)
;; :url (aget fields 6)
;; :status status
;; :bytes (if (= bytes "-") 0 (Long/parseLong bytes))
;; :ref (.substring ref 1 (dec (count ref)))}))
;;;; Tallying
(defn bump! [m #^String k delta]
(let [delta (long delta)]
(if-let [#^AtomicLong l (get @m k)]
(.addAndGet l delta)
(swap! m #(assoc % (String. k)
(if-let [#^AtomicLong l (get % k)]
(AtomicLong. (+ (.get l) delta))
(AtomicLong. delta)))))))
(def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$")
;; (defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records]
;; (doseq [{:keys [#^String url bytes client status #^String ref]} records]
;; (if (= status "404")
;; (bump! s404s url 1)
;; (do (bump! url-bytes url bytes)
;; (when (and (.startsWith url "/ongoing/When/")
;; (re-matches article-re url))
;; (bump! url-hits url 1)
;; (bump! clients client 1)
;; (when-not (or (= ref "-")
;; (.startsWith ref "http://www.tbray.org/ongoing/"))
;; (bump! refs ref 1)))))))
;;;; Reporting
(defn sort-by-vals-desc [m]
(sort-by #(- (val %)) m))
(defn take-greatest-vals [n m]
(when-let [m (seq m)]
(reduce (fn [best x]
(if (>= (val x) (val (last best)))
(vec (take n (sort-by-vals-desc (conj best x))))
best))
[(first m)] (rest m))))
(defn truncate [#^String s n]
(if (> (count s) n) (str (.substring s 0 n) "...") s))
(defn print-top10 [results label & [shrink?]]
(println "Top" label)
(let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")]
(doseq [[k v] (take 10 results)]
(let [v (if shrink? (/ v 1024.0 1024.0) (long v))]
(println (format fmt v (truncate k 60))))))
(println))
(defn report [tallies state]
;; (println (join ", " (map #(str (count @(val %)) " " (name (key %))) state)))
(println)
(doseq [result (pmap #(cons (take-greatest-vals 10 @(state (first %))) (rest %))
tallies)]
(apply print-top10 result)))
;;;; Main
(def tallies [[:url-hits "URIs by hit"]
[:url-bytes "URIs by bytes" :shrink]
[:s404s "404s"]
[:clients "client addresses"]
[:refs "referrers"]])
(def thread-count 64)
(defn wf-atoms [file filename]
;; Open out file
(let [file-output (java.io.RandomAccessFile. filename "rw")
chunk-count (int (/ (.length (java.io.File. file)) (* 10 1024 1024)))
chunk-count (int (if (<= chunk-count 0) 1 chunk-count))
state (zipmap (map first tallies) (repeatedly #(atom {})))
chunks (indexed (chunk-file file chunk-count))
queue (java.util.concurrent.LinkedBlockingQueue.
(+ chunk-count thread-count))
still-chunking (atom true)
chunker (future
(do (doseq [chunk chunks]
(.put queue chunk))
(println "chunking complete.")
(reset! still-chunking false)))]
(let [threads
(for [tid (range thread-count)]
(future
(while
(when-let [[idx [start end]]
(if @still-chunking
(.take queue)
(.poll queue))]
(when (zero? (mod idx 10))
(println (str "Chunk " idx "/" chunk-count " ("
start " -> " end ")" " Q " (.size queue))))
(let [ary (make-array String 12)
{:keys [url-hits url-bytes clients refs s404s]} state]
(dolines
file start end
#(when-let [record (parse-line % ary file-output)]
;; (let [{:keys [url bytes client status #^String ref]} record]
;; (if (= status "404")
;; (bump! s404s url 1)
;; (do (bump! url-bytes url bytes)
;; (when (and (.startsWith #^String url "/ongoing/When/")
;; (re-matches article-re url))
;; (bump! url-hits url 1)
;; (bump! clients client 1)
;; (when-not (or (= ref "-")
;; (.startsWith ref "http://www.tbray.org/ongoing/"))
;; (bump! refs ref 1))))))
)))
true))))]
(dorun (map deref threads)))
(.close file-output)
(time (report tallies state))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment