Skip to content

Instantly share code, notes, and snippets.

@reedho
Last active November 8, 2023 02:21
Show Gist options
  • Save reedho/ecfd73d4e997e3ed04064b379c512362 to your computer and use it in GitHub Desktop.
Save reedho/ecfd73d4e997e3ed04064b379c512362 to your computer and use it in GitHub Desktop.
Datalevin range-seq with Missionary
{:paths ["src" "resources"]
:deps {org.clojure/clojure {:mvn/version "1.12.0-alpha5"}
missionary/missionary {:mvn/version "b.33"}
datalevin/datalevin {:mvn/version "0.8.21"}
}
:aliases
{:run-m {:main-opts ["-m" "playground.playground"]}
:run-x {:ns-default playground.playground
:exec-fn greet
:exec-args {:name "Clojure"}}
:build {:deps {io.github.clojure/tools.build
{:mvn/version "0.9.6"}}
:ns-default build}
:test {:extra-paths ["test"]
:extra-deps {org.clojure/test.check {:mvn/version "1.1.1"}
io.github.cognitect-labs/test-runner
{:git/tag "v0.5.1" :git/sha "dfb30dd"}}}
:default {:jvm-opts ["--add-opens=java.base/java.nio=ALL-UNNAMED"
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
"-Xmx5g"
"-Duser.timezone=UTC"
"-XX:-OmitStackTraceInFastThrow"
]}}}
(ns playground.playground
(:require [missionary.core :as m]
[datalevin.core :as d]))
(comment
;; prepare data
(def kvdb (d/open-kv "/tmp/db1"))
(d/open-dbi kvdb "table")
;; clear
(d/clear-dbi kvdb "table")
(-> (d/get-range kvdb "table" [:all] :uuid :integer true)
(count))
(d/transact-kv
kvdb
(mapv (fn [n]
[:put "table" (random-uuid) (int n)])
(range 200000)))
(let [v (atom 0)]
(with-open [kv-seq (d/range-seq kvdb "table" [:all] :uuid :integer false)]
(doseq [_x kv-seq]
(swap! v inc)))
@v)
;; close
(d/close-kv kvdb)
)
(defn kv-flow
[db]
(let [kv-seq (d/range-seq db "table" [:all] :uuid :integer false)]
(->> (m/observe
(fn [cb]
(.start
(Thread.
(reify Runnable
(run [_]
(loop [xs (seq kv-seq)]
(if-let [x (first xs)]
(do (cb x)
(recur (try (next (seq xs))
(catch Exception _e
;;(println _e "GOT-EXCEPTION")
nil))))
(cb nil)))
))))
;; cleanup
(bound-fn []
(println "CLEANUP")
(.close kv-seq))))
(m/eduction (take-while some?))
)))
(comment
;; Usage
(->> (kv-flow kvdb)
(m/reduce conj)
(m/?)
(count)) ;; => 200000
(->> (kv-flow kvdb)
(m/eduction (take 3))
(m/reduce conj)
(m/?)
(count)) ;; => 3
)
@reedho
Copy link
Author

reedho commented Nov 8, 2023

As of now, satisfied with below abstraction:

(defn range-seq->flow
  [& range-seq-args]
  (let [kv-seq (apply d/range-seq range-seq-args)]
    (->> (m/observe
           (fn [cb]
             (.start
               (Thread.
                 (reify Runnable
                   (run [_]
                     (loop [xs (seq kv-seq)]
                       (if-let [x (first xs)]
                         (do (cb x)
                             (recur (try (next (seq xs))
                                         (catch Exception _e nil))))
                         (cb nil)))))))
             #(.close kv-seq)))
         (m/eduction (take-while some?)))))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment