This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns wf | |
(:require (clojure.java.io/writer)) | |
(:use [clojure.contrib.seq :only (indexed)]) | |
(:import java.util.concurrent.atomic.AtomicLong)) | |
(set! *warn-on-reflection* true) | |
;;; Big wins: | |
;;; * Use queue instead of pmap. | |
;;; * Overlap chunking with actual runtime | |
;;;; Reading | |
(defn chunk-file [filename n] | |
(println "chunking...") | |
(let [file (java.io.RandomAccessFile. filename "r")] | |
(let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))] | |
(do (when-not (zero? offset) | |
(.seek file offset) | |
(while (not= (.read file) (int \newline)))) | |
(.getFilePointer file))) | |
offsets (concat offsets [(.length file)])] | |
(partition 2 (interleave offsets (rest offsets)))))) | |
;;; Writing | |
(comment | |
(defn write-line [file line] | |
(let [ary (make-array String 6) | |
fields (dumbest-split "a\tb\tc\tb" \tab ary)] | |
(map fields)) | |
) | |
) | |
(defn dolines [#^String file start-byte end-byte f] | |
(with-open [stream (java.io.FileInputStream. file)] | |
(.skip stream start-byte) | |
(let [rdr (java.io.BufferedReader. | |
(java.io.InputStreamReader. | |
(java.io.BufferedInputStream. stream | |
(* 8 131072)) | |
"US-ASCII") 131072)] | |
(loop [#^String line (.readLine rdr) | |
remaining (- end-byte start-byte)] | |
(when (and (>= remaining 0) line) | |
(f line) | |
(recur (.readLine rdr) | |
(- remaining (.length line)))))))) | |
(defn #^"[Ljava.lang.String;" dumbest-split | |
[#^String s c #^"[Ljava.lang.String;" tokens] | |
(let [len (dec (int (alength tokens)))] | |
(loop [start (int 0) | |
i (int 0)] | |
(let [idx (int (.indexOf s (int c) (int start)))] | |
(if (or (neg? idx) (>= i len)) | |
(do (aset tokens i (.substring s start)) | |
tokens) | |
(do (aset tokens i (.substring s start idx)) | |
(recur (inc idx) (inc i)))))))) | |
(def template | |
{0 :datetime | |
1 :recordtype | |
2 :msisn | |
3 :imei | |
4 :otherparty | |
5 :typenumber | |
6 :callduration | |
7 :cellid}) | |
(comment | |
;; | |
(let | |
[real ["25 Aug 2003 09:02:32" "MTC" "78652964404" "3511034004353504" "" "" "50" "11972"]] | |
(apply str (interpose "; " (map #(get %1 1) | |
(map-indexed | |
(fn [index value] | |
(let [type (get template index)] | |
[type value])) real))))) | |
) | |
(defn parse-line [line ary fo] | |
(let [fields (dumbest-split line \t ary) | |
line (apply str (interpose "; " | |
(first (map #(get %1 1) | |
(map-indexed | |
(fn [index value] | |
(let [type (get template index)] | |
[type value])) fields)))))] | |
(println (type fields)) | |
(.writeBytes fo (concat line "\r\n")))) | |
;; (when (and (= (aget fields 5) "\"GET") | |
;; ('#{"200" "304" "404"} status)) | |
;; {:client (aget fields 0) | |
;; :url (aget fields 6) | |
;; :status status | |
;; :bytes (if (= bytes "-") 0 (Long/parseLong bytes)) | |
;; :ref (.substring ref 1 (dec (count ref)))})) | |
;;;; Tallying | |
(defn bump! [m #^String k delta] | |
(let [delta (long delta)] | |
(if-let [#^AtomicLong l (get @m k)] | |
(.addAndGet l delta) | |
(swap! m #(assoc % (String. k) | |
(if-let [#^AtomicLong l (get % k)] | |
(AtomicLong. (+ (.get l) delta)) | |
(AtomicLong. delta))))))) | |
(def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$") | |
;; (defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records] | |
;; (doseq [{:keys [#^String url bytes client status #^String ref]} records] | |
;; (if (= status "404") | |
;; (bump! s404s url 1) | |
;; (do (bump! url-bytes url bytes) | |
;; (when (and (.startsWith url "/ongoing/When/") | |
;; (re-matches article-re url)) | |
;; (bump! url-hits url 1) | |
;; (bump! clients client 1) | |
;; (when-not (or (= ref "-") | |
;; (.startsWith ref "http://www.tbray.org/ongoing/")) | |
;; (bump! refs ref 1))))))) | |
;;;; Reporting | |
(defn sort-by-vals-desc [m] | |
(sort-by #(- (val %)) m)) | |
(defn take-greatest-vals [n m] | |
(when-let [m (seq m)] | |
(reduce (fn [best x] | |
(if (>= (val x) (val (last best))) | |
(vec (take n (sort-by-vals-desc (conj best x)))) | |
best)) | |
[(first m)] (rest m)))) | |
(defn truncate [#^String s n] | |
(if (> (count s) n) (str (.substring s 0 n) "...") s)) | |
(defn print-top10 [results label & [shrink?]] | |
(println "Top" label) | |
(let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")] | |
(doseq [[k v] (take 10 results)] | |
(let [v (if shrink? (/ v 1024.0 1024.0) (long v))] | |
(println (format fmt v (truncate k 60)))))) | |
(println)) | |
(defn report [tallies state] | |
;; (println (join ", " (map #(str (count @(val %)) " " (name (key %))) state))) | |
(println) | |
(doseq [result (pmap #(cons (take-greatest-vals 10 @(state (first %))) (rest %)) | |
tallies)] | |
(apply print-top10 result))) | |
;;;; Main | |
(def tallies [[:url-hits "URIs by hit"] | |
[:url-bytes "URIs by bytes" :shrink] | |
[:s404s "404s"] | |
[:clients "client addresses"] | |
[:refs "referrers"]]) | |
(def thread-count 64) | |
(defn wf-atoms [file filename] | |
;; Open out file | |
(let [file-output (java.io.RandomAccessFile. filename "rw") | |
chunk-count (int (/ (.length (java.io.File. file)) (* 10 1024 1024))) | |
chunk-count (int (if (<= chunk-count 0) 1 chunk-count)) | |
state (zipmap (map first tallies) (repeatedly #(atom {}))) | |
chunks (indexed (chunk-file file chunk-count)) | |
queue (java.util.concurrent.LinkedBlockingQueue. | |
(+ chunk-count thread-count)) | |
still-chunking (atom true) | |
chunker (future | |
(do (doseq [chunk chunks] | |
(.put queue chunk)) | |
(println "chunking complete.") | |
(reset! still-chunking false)))] | |
(let [threads | |
(for [tid (range thread-count)] | |
(future | |
(while | |
(when-let [[idx [start end]] | |
(if @still-chunking | |
(.take queue) | |
(.poll queue))] | |
(when (zero? (mod idx 10)) | |
(println (str "Chunk " idx "/" chunk-count " (" | |
start " -> " end ")" " Q " (.size queue)))) | |
(let [ary (make-array String 12) | |
{:keys [url-hits url-bytes clients refs s404s]} state] | |
(dolines | |
file start end | |
#(when-let [record (parse-line % ary file-output)] | |
;; (let [{:keys [url bytes client status #^String ref]} record] | |
;; (if (= status "404") | |
;; (bump! s404s url 1) | |
;; (do (bump! url-bytes url bytes) | |
;; (when (and (.startsWith #^String url "/ongoing/When/") | |
;; (re-matches article-re url)) | |
;; (bump! url-hits url 1) | |
;; (bump! clients client 1) | |
;; (when-not (or (= ref "-") | |
;; (.startsWith ref "http://www.tbray.org/ongoing/")) | |
;; (bump! refs ref 1)))))) | |
))) | |
true))))] | |
(dorun (map deref threads))) | |
(.close file-output) | |
(time (report tallies state)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment