Skip to content

Instantly share code, notes, and snippets.

@atroche
Last active January 23, 2020 03:15
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save atroche/08b966664489f80e87a145cef6775b07 to your computer and use it in GitHub Desktop.
Save atroche/08b966664489f80e87a145cef6775b07 to your computer and use it in GitHub Desktop.
(ns sunshine.disk-read
(:require [clojure.core.async :as async :refer [>! <! go-loop chan close! <!!]]])
(:import (java.io BufferedReader FileReader FileInputStream BufferedInputStream InputStreamReader)))
(def ten-gb-filename "ten.json")
(def one-meg (* 1024 1024))
(defn ^FileInputStream input-stream [^String fname]
(FileInputStream. fname))
(defn count-newlines [^bytes barray]
(let [num-bytes (alength barray)]
(loop [i 0
newlines 0]
(if (>= i num-bytes)
newlines
(if (= 10 (aget ^bytes barray i))
(recur (inc i)
(inc newlines))
(recur (inc i)
newlines))))))
(with-open [file-stream (FileInputStream. ten-gb-filename)]
(let [channel (chan 500)
counters (for [_ (range 4)]
(go-loop [newline-count 0]
(let [barray (async/<! channel)]
(if (nil? barray)
newline-count
(recur (+ newline-count
(count-newlines barray)))))))]
(go-loop []
(let [barray (byte-array one-meg)
bytes-read (.read file-stream barray)]
;; this put will block if there are more than 500MBs waiting in channel
;; so as to not engorge the heap (learnt the hard way)
(>! channel barray)
(if (> bytes-read 0) ;; .read returns a -1 on EOF
(recur)
(close! channel))))
(reduce + (map <!! counters))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment