Last active
July 17, 2018 21:41
-
-
Save uwo/9a6de35d17c73337ffde3dc2c5f53c54 to your computer and use it in GitHub Desktop.
process lazy io sequence
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{:paths ["."] | |
:deps | |
{org.clojure/core.async {:mvn/version "0.4.474"}}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns user | |
(:require [clojure.core.async :as a]) | |
(:import (java.util.concurrent TimeUnit))) | |
(defn drain! | |
"Close ch and discard all items on it. Returns nil." | |
[ch] | |
(a/close! ch) | |
(a/go-loop [] | |
(when-some [_ (a/<! ch)] (recur))) | |
nil) | |
(defn onto-chan!! | |
([ch coll] (onto-chan!! ch coll true)) | |
([ch coll close?] | |
(a/thread | |
(loop [vs (seq coll)] | |
(if (and vs (a/>!! ch (first vs))) | |
(recur (next vs)) | |
(when close? | |
(a/close! ch))))))) | |
(defn human-time-str | |
[^Long elapsed] | |
(let [minutes (.toMinutes TimeUnit/MILLISECONDS elapsed) | |
seconds (.toSeconds TimeUnit/MILLISECONDS elapsed)] | |
(format "%02d min, %02d sec" minutes (mod seconds 60)))) | |
(defn proc | |
"Start a process that consumes from lazy-io and processes each step in | |
n threads with xf! It's assumed that xf! does IO and that each step thru | |
`lazy-io` should return a fully realized thing. Note that this will | |
use n+1 threads, because a thread is alloted to spool the lazy-io | |
sequence onto the processing pipeline." | |
[n xf! on-exit lazy-io] | |
(let [from (a/chan 1) | |
_ (onto-chan!! from lazy-io) | |
abort-fn (memoize | |
;; ^ prevent multiple calls | |
(fn abort-fn [] | |
(drain! from) | |
(on-exit) | |
nil)) | |
state (atom {:start (System/currentTimeMillis) | |
:count 0 | |
:abort-fn abort-fn}) | |
on-done (fn [_] | |
(println "Finished with " (:count @state) " processed.") | |
(swap! state (fn [{:keys [start] :as s}] | |
(let [elapsed (- (System/currentTimeMillis) start)] | |
(assoc s :time (human-time-str elapsed))))) | |
(println "Elapsed: " (:time @state)) | |
(on-exit) | |
nil) | |
to (a/chan (a/dropping-buffer 0)) | |
_ (a/take! to on-done) ;; will only take once when closes | |
ex-handler (fn [e] | |
(drain! from) | |
(on-exit) | |
(throw e))] | |
(a/pipeline-blocking n to xf! from true ex-handler) | |
state)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment