Skip to content

Instantly share code, notes, and snippets.

@abdullin
Created January 26, 2016 20:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abdullin/c6128631ce0740f84b0a to your computer and use it in GitHub Desktop.
Save abdullin/c6128631ce0740f84b0a to your computer and use it in GitHub Desktop.
Sample clojure code for searching log entries stored in chunks in custom binary format on Windows Azure
(ns azurelog.reader
(:require [clj-time.core :as t]
[clj-time.format :as f]
[clojure.java.io :as io])
(:import [com.microsoft.azure.storage CloudStorageAccount]
[java.nio ByteOrder ByteBuffer]
[com.microsoft.azure.storage.blob]
[java.util.zip GZIPInputStream]
[org.apache.commons.io.input BoundedInputStream]))
(defn conn-str [config]
(str "DefaultEndpointsProtocol=http;"
"AccountName=" (:name config)
";AccountKey=" (:key config)
";"))
(defn blob-client [config]
(.createCloudBlobClient (CloudStorageAccount/parse (conn-str config))))
(defn get-logs-container [client] (.getContainerReference client "sv-shippedlogs"))
(defn range->days [from to]
(take-while
#(t/before? % to)
(iterate #(t/plus % (t/days 1)) from)))
(defn date->hours [date] (map #(t/plus date (t/hours %)) (range 0 24)))
(def date-format (f/formatter "yyyyMMdd-HH"))
(defn log-blob-name [ctx date]
(str ctx "/" (f/unparse date-format date) ".log.gz"))
(defn all-log-files [container ctx start end]
(map
#(.getPageBlobReference container (log-blob-name ctx %))
(flatten (map date->hours (range->days start end)))))
(defn read-chunk-length! [s]
(let [
bs (byte-array 6)
_ (.read s bs)
buf (ByteBuffer/wrap bs)
]
(.order buf ByteOrder/LITTLE_ENDIAN)
(assert (= 1 (.getShort buf)) "unexpected version")
(.getInt buf)))
(defn matches-query? [s query] (some #(.contains % query) s))
(defn date-line? [s] (re-find #"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+ \+00:00" s))
(defn group-entries
"returns a lazy sequence over matching log entries"
[pred coll agg]
(let [step (fn [p c a]
(if-let [s (seq c)]
(if (p (first s))
(cons (reverse a) (group-entries p (rest s) (list (first s))))
(recur p (rest s) (cons (first s) a))
)
(list (reverse a))
))]
(lazy-seq (step pred coll agg))))
(defn get-matching-entries [s query]
(filter #(matches-query? % query) (group-entries date-line? s [])))
(defn search-blob [ref query]
(printf "Loading blob %s to search for '%s'\n" (.getName ref) query)
(when (.exists ref)
(with-open [
s (.openInputStream ref)
rdr (-> s
(BoundedInputStream. (read-chunk-length! s))
GZIPInputStream.
io/reader)
]
(doall (get-matching-entries (line-seq rdr) query)))))
(defn get-container [] (get-logs-container (blob-client config)))
(defn search-container [container req]
(let [
{start :start end :end ctx :context query :query} req
blobs (all-log-files container ctx start end)
]
(printf "Searching '%s' in %s from %s to %s\n" query ctx start end)
(pmap #(search-blob % query) blobs)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment