Clojure chunked binary sequences
(ns test-clj-byte-chunk-seq
(:import ( InputStream OutputStream
FileInputStream FileOutputStream)))
(set! *warn-on-reflection* true)
(def ^:const ONE_MEG (* 1024 1024))
(deftype ByteArrayChunk [^bytes array ^int offset ^int end]
(dropFirst [this]
(if (= offset end)
(throw (IllegalStateException. "dropFirst of empty ByteArrayChunk"))
(ByteArrayChunk. array (inc offset) end)))
(reduce [this f start]
(loop [ret (f start (aget array offset))
i (inc offset)]
(if (< i end)
(recur (f ret (aget array i)) (inc i))
(nth [this i]
(aget array (+ offset i)))
(nth [this i not-found]
(if (and (>= i 0) (< i (.count this)))
(.nth this i)
(count [this]
(- end offset)))
(defn byte-array-chunk
(ByteArrayChunk. array 0 (count array)))
([array offset]
(ByteArrayChunk. array offset (count array)))
([array offset end]
(ByteArrayChunk. array offset end)))
(defn read-bytes-chunked [^InputStream stream chunk-size]
(let [buffer (byte-array chunk-size)
c (.read stream buffer)]
(when (pos? c)
(let [b (byte-array-chunk buffer)]
(chunk-cons b (read-bytes-chunked stream chunk-size)))))))
(defn write-bytes-chunked [^OutputStream stream coll]
(loop [coll coll]
(when-let [s (seq coll)]
(let [^ByteArrayChunk c (chunk-first s)]
(assert (instance? ByteArrayChunk c))
(.write stream (.array c) (.offset c) (count c)))
(recur (chunk-rest s)))))
(defn chunk-take [n coll]
(when (pos? n)
(when-let [s (seq coll)]
(let [c (chunk-first s)
size (count c)]
(if (< n size)
(take n s)
(chunk-cons (chunk-first s)
(chunk-take (- n size) (chunk-rest s)))))))))
(println "process,block size (bytes),byte count,time (sec),bytes/sec")
(doseq [blocksize [65536 32768 16384 8192 4096 2048 1024 512 256]]
(let [total-bytes (* ONE_MEG blocksize)]
(dotimes [j 5]
(print (str "clj+byte-chunk-seq," blocksize "," total-bytes ","))
(let [start-time (System/nanoTime)]
(with-open [in (FileInputStream. "/dev/zero")
out (FileOutputStream. "/dev/null")]
(write-bytes-chunked out
(chunk-take total-bytes
(read-bytes-chunked in blocksize))))
(let [end-time (System/nanoTime)
elapsed (/ (- end-time start-time) 1000000000.0)]
(printf "%.6f," elapsed)
(printf "%d\n" (long (/ total-bytes elapsed)))
;;; Sample output
;; process,block size (bytes),byte count,time (sec),bytes/sec
;; clj+byte-chunk-seq,4096,4294967296,4.384292,979626196
;; clj+byte-chunk-seq,4096,4294967296,4.053077,1059680656
;; clj+byte-chunk-seq,4096,4294967296,4.058464,1058274089
;; clj+byte-chunk-seq,4096,4294967296,4.044335,1061971200
;; clj+byte-chunk-seq,4096,4294967296,4.051130,1060189946
;; clj+byte-chunk-seq,2048,2147483648,3.149845,681774388
;; clj+byte-chunk-seq,2048,2147483648,3.144579,682916106
;; clj+byte-chunk-seq,2048,2147483648,3.161608,679237795
;; clj+byte-chunk-seq,2048,2147483648,3.162058,679141131
;; clj+byte-chunk-seq,2048,2147483648,3.148087,682155114
;; clj+byte-chunk-seq,1024,1073741824,2.725380,393978756
;; clj+byte-chunk-seq,1024,1073741824,2.725009,394032395
;; clj+byte-chunk-seq,1024,1073741824,2.733059,392871805
;; clj+byte-chunk-seq,1024,1073741824,2.728850,393477774
;; clj+byte-chunk-seq,1024,1073741824,2.725437,393970517
;; clj+byte-chunk-seq,512,536870912,2.540585,211317831
;; clj+byte-chunk-seq,512,536870912,2.550349,210508801
;; clj+byte-chunk-seq,512,536870912,2.538364,211502728
;; clj+byte-chunk-seq,512,536870912,2.543661,211062288
;; clj+byte-chunk-seq,512,536870912,2.548477,210663432
;; clj+byte-chunk-seq,256,268435456,2.474146,108496206
;; clj+byte-chunk-seq,256,268435456,2.470040,108676562
;; clj+byte-chunk-seq,256,268435456,2.464580,108917323
;; clj+byte-chunk-seq,256,268435456,2.464933,108901725
;; clj+byte-chunk-seq,256,268435456,2.471308,108620801
