Created
September 29, 2020 07:07
-
-
Save jvillste/22a4466bc4428d4475df6dc9f3dc2e06 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns serialization | |
(:require [clojure.java.io :as io] | |
[taoensso.nippy :as nippy]) | |
(:import [java.io DataInputStream DataOutputStream EOFException] | |
clojure.lang.IReduceInit | |
clojure.lang.IReduce)) | |
(defn with-data-output-stream [file-name function] | |
(with-open [data-output-stream (DataOutputStream. (io/output-stream (io/file file-name)))] | |
(function data-output-stream))) | |
(defn write-to-data-output-stream [data-output-stream sequence] | |
(let [byte-array (nippy/freeze sequence) | |
length (alength byte-array)] | |
(.writeInt data-output-stream length) | |
(.write data-output-stream byte-array 0 length))) | |
(defn write-file | |
([file-name sequence] | |
(write-file file-name 1000 sequence)) | |
([file-name partition-size sequence] | |
(with-data-output-stream file-name | |
(fn [data-output-stream] | |
(doseq [batch (partition-all partition-size sequence)] | |
(write-to-data-output-stream data-output-stream | |
batch)))))) | |
(defn transduce-file [file-name & {:as options}] | |
(let [transducer (comp cat (or (:transducer options) | |
identity)) | |
reducing-function (transducer (or (:reducer options) | |
(constantly nil)))] | |
(with-open [data-input-stream (DataInputStream. (io/input-stream (io/file file-name)))] | |
(loop [value (if (contains? options :initial-value) | |
(:initial-value options) | |
(if (contains? options :reducer) | |
((:reducer options)) | |
nil)) | |
buffer nil] | |
(if-let [segment-length (try (.readInt data-input-stream) | |
(catch EOFException e))] | |
(let [buffer (if (and buffer | |
(< segment-length (alength ^bytes buffer))) | |
buffer | |
(byte-array (int (* 1.3 segment-length))))] | |
(.read data-input-stream buffer 0 segment-length) | |
(let [result (reducing-function value (nippy/thaw buffer))] | |
(if (reduced? result) | |
(reducing-function @result) | |
(recur result buffer)))) | |
(reducing-function value)))))) | |
(defn file-reducible [file-name] | |
(reify | |
IReduceInit | |
(reduce [this reducing-function initial-value] | |
(transduce-file file-name | |
:reducer reducing-function | |
:initial-value initial-value)) | |
IReduce | |
(reduce [this reducing-function] | |
(transduce-file file-name | |
:reducer reducing-function)))) | |
(comment | |
(def file-name "temp/test.data") | |
(write-file file-name (range 10)) | |
(transduce-file file-name | |
:transducer identity | |
:reducer conj) | |
(into [] (file-reducible file-name)) | |
(reduce + (file-reducible file-name)) | |
(into [] (range 10)) | |
(defn transduce-increased [transducer reducer file-name] | |
(transduce-file file-name | |
:transducer (comp (map inc) | |
transducer) | |
:reducer reducer)) | |
(transduce-increased identity conj file-name) | |
(transduce-increased (filter even?) conj file-name) | |
(defn increased-reducible [file-name] | |
(eduction (map inc) (file-reducible file-name))) | |
(into [] | |
(filter even?) | |
(increased-reducible file-name)) | |
(into [] | |
(filter even?) | |
(range 10)) | |
(eduction (filter even?) | |
(range 10)) | |
(filter even? | |
(range 10)) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment