Skip to content

Instantly share code, notes, and snippets.

@jvillste
Created September 29, 2020 07:07
Show Gist options
  • Save jvillste/22a4466bc4428d4475df6dc9f3dc2e06 to your computer and use it in GitHub Desktop.
Save jvillste/22a4466bc4428d4475df6dc9f3dc2e06 to your computer and use it in GitHub Desktop.
(ns serialization
(:require [clojure.java.io :as io]
[taoensso.nippy :as nippy])
(:import [java.io DataInputStream DataOutputStream EOFException]
clojure.lang.IReduceInit
clojure.lang.IReduce))
(defn with-data-output-stream [file-name function]
(with-open [data-output-stream (DataOutputStream. (io/output-stream (io/file file-name)))]
(function data-output-stream)))
(defn write-to-data-output-stream [data-output-stream sequence]
(let [byte-array (nippy/freeze sequence)
length (alength byte-array)]
(.writeInt data-output-stream length)
(.write data-output-stream byte-array 0 length)))
(defn write-file
([file-name sequence]
(write-file file-name 1000 sequence))
([file-name partition-size sequence]
(with-data-output-stream file-name
(fn [data-output-stream]
(doseq [batch (partition-all partition-size sequence)]
(write-to-data-output-stream data-output-stream
batch))))))
(defn transduce-file [file-name & {:as options}]
(let [transducer (comp cat (or (:transducer options)
identity))
reducing-function (transducer (or (:reducer options)
(constantly nil)))]
(with-open [data-input-stream (DataInputStream. (io/input-stream (io/file file-name)))]
(loop [value (if (contains? options :initial-value)
(:initial-value options)
(if (contains? options :reducer)
((:reducer options))
nil))
buffer nil]
(if-let [segment-length (try (.readInt data-input-stream)
(catch EOFException e))]
(let [buffer (if (and buffer
(< segment-length (alength ^bytes buffer)))
buffer
(byte-array (int (* 1.3 segment-length))))]
(.read data-input-stream buffer 0 segment-length)
(let [result (reducing-function value (nippy/thaw buffer))]
(if (reduced? result)
(reducing-function @result)
(recur result buffer))))
(reducing-function value))))))
(defn file-reducible [file-name]
(reify
IReduceInit
(reduce [this reducing-function initial-value]
(transduce-file file-name
:reducer reducing-function
:initial-value initial-value))
IReduce
(reduce [this reducing-function]
(transduce-file file-name
:reducer reducing-function))))
(comment
(def file-name "temp/test.data")
(write-file file-name (range 10))
(transduce-file file-name
:transducer identity
:reducer conj)
(into [] (file-reducible file-name))
(reduce + (file-reducible file-name))
(into [] (range 10))
(defn transduce-increased [transducer reducer file-name]
(transduce-file file-name
:transducer (comp (map inc)
transducer)
:reducer reducer))
(transduce-increased identity conj file-name)
(transduce-increased (filter even?) conj file-name)
(defn increased-reducible [file-name]
(eduction (map inc) (file-reducible file-name)))
(into []
(filter even?)
(increased-reducible file-name))
(into []
(filter even?)
(range 10))
(eduction (filter even?)
(range 10))
(filter even?
(range 10))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment