Skip to content

Instantly share code, notes, and snippets.

Last active July 17, 2018 21:41
What would you like to do?
process lazy io sequence
{:paths ["."]
{org.clojure/core.async {:mvn/version "0.4.474"}}}
(ns user
(:require [clojure.core.async :as a])
(:import (java.util.concurrent TimeUnit)))
(defn drain!
"Close ch and discard all items on it. Returns nil."
(a/close! ch)
(a/go-loop []
(when-some [_ (a/<! ch)] (recur)))
(defn onto-chan!!
([ch coll] (onto-chan!! ch coll true))
([ch coll close?]
(loop [vs (seq coll)]
(if (and vs (a/>!! ch (first vs)))
(recur (next vs))
(when close?
(a/close! ch)))))))
(defn human-time-str
[^Long elapsed]
(let [minutes (.toMinutes TimeUnit/MILLISECONDS elapsed)
seconds (.toSeconds TimeUnit/MILLISECONDS elapsed)]
(format "%02d min, %02d sec" minutes (mod seconds 60))))
(defn proc
"Start a process that consumes from lazy-io and processes each step in
n threads with xf! It's assumed that xf! does IO and that each step thru
`lazy-io` should return a fully realized thing. Note that this will
use n+1 threads, because a thread is alloted to spool the lazy-io
sequence onto the processing pipeline."
[n xf! on-exit lazy-io]
(let [from (a/chan 1)
_ (onto-chan!! from lazy-io)
abort-fn (memoize
;; ^ prevent multiple calls
(fn abort-fn []
(drain! from)
state (atom {:start (System/currentTimeMillis)
:count 0
:abort-fn abort-fn})
on-done (fn [_]
(println "Finished with " (:count @state) " processed.")
(swap! state (fn [{:keys [start] :as s}]
(let [elapsed (- (System/currentTimeMillis) start)]
(assoc s :time (human-time-str elapsed)))))
(println "Elapsed: " (:time @state))
to (a/chan (a/dropping-buffer 0))
_ (a/take! to on-done) ;; will only take once when closes
ex-handler (fn [e]
(drain! from)
(throw e))]
(a/pipeline-blocking n to xf! from true ex-handler)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment