-
-
Save tkovis/92877d637f30b32d0aeb14a193d43d5f to your computer and use it in GitHub Desktop.
xtdb aggregate queries v2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns repro | |
(:require | |
[xtdb.api :as xt] | |
[xtdb.query])) | |
(comment | |
'{com.xtdb/xtdb-core {:mvn/version "1.19.0-beta1"} | |
com.xtdb/xtdb-rocksdb {:mvn/version "1.19.0-beta1"} | |
com.xtdb/xtdb-lucene {:mvn/version "1.19.0-beta1"}}) | |
(def node (xt/start-node | |
'{:xtdb/index-store {:kv-store | |
{:xtdb/module xtdb.rocksdb/->kv-store | |
:db-dir "repro/rocksdb/index"} | |
:checkpointer {:xtdb/module xtdb.checkpoint/->checkpointer | |
:store {:xtdb/module xtdb.checkpoint/->filesystem-checkpoint-store | |
:path "repro/xtdb-checkpoint"} | |
:approx-frequency "PT1H"}} | |
:xtdb/document-store {:kv-store | |
{:xtdb/module xtdb.rocksdb/->kv-store | |
:db-dir "repro/rocksdb/document"}} | |
:xtdb/tx-log {:kv-store | |
{:xtdb/module xtdb.rocksdb/->kv-store | |
:db-dir "repro/rocksdb/tx"}} | |
:xtdb.lucene/lucene-store {:db-dir "repro/lucene" | |
:indexer xtdb.lucene.multi-field/->indexer}})) | |
(defn delete-recursively [fname] | |
(doseq [f (reverse (file-seq (clojure.java.io/file fname)))] | |
(clojure.java.io/delete-file f))) | |
(defn restart-node [] | |
(try | |
(.close node) | |
(delete-recursively "repro") | |
(catch Exception e (println e))) | |
(def node (xt/start-node | |
'{:xtdb/index-store {:kv-store | |
{:xtdb/module xtdb.rocksdb/->kv-store | |
:db-dir "repro/rocksdb/index"} | |
:checkpointer {:xtdb/module xtdb.checkpoint/->checkpointer | |
:store {:xtdb/module xtdb.checkpoint/->filesystem-checkpoint-store | |
:path "repro/xtdb-checkpoint"} | |
:approx-frequency "PT1H"}} | |
:xtdb/document-store {:kv-store | |
{:xtdb/module xtdb.rocksdb/->kv-store | |
:db-dir "repro/rocksdb/document"}} | |
:xtdb/tx-log {:kv-store | |
{:xtdb/module xtdb.rocksdb/->kv-store | |
:db-dir "repro/rocksdb/tx"}} | |
:xtdb.lucene/lucene-store {:db-dir "repro/lucene" | |
:indexer xtdb.lucene.multi-field/->indexer}}))) | |
(defn ->uuid [_] (java.util.UUID/randomUUID)) | |
(do | |
(def parent-atom (atom [])) | |
(def c1-atom (atom [])) | |
(def c2-atom (atom [])) | |
(def c3-atom (atom [])) | |
(def c4-atom (atom []))) | |
(defn c3->c4 [db eid] | |
(let [c3 (xt/entity db eid)] | |
(when (not (:c3/must-not-have c3)) | |
(:c3/c4 c3)))) | |
(defn c3->c4-v2 [db c3] | |
(when (not (ffirst (xt/q db `{:find [must-not-have] | |
:in [c3] | |
:where [[c3 :c3/must-not-have must-not-have]]} | |
c3))) | |
(ffirst (xt/q db `{:find [c4] | |
:in [c3] | |
:where [[c3 :c3/c4 c4]]} | |
c3)))) | |
(defmethod xtdb.query/aggregate 'count-c4 [_] | |
(fn | |
([] 0) | |
([acc] acc) | |
([acc eid] | |
(let [c4 (xt/entity (xt/db node) eid)] | |
(if (and (:c4/must-have c4) | |
(not (:c4/must-not-have c4))) | |
(inc acc) | |
acc))))) | |
(def c4-must-have-rate 1.0) | |
(def c4-must-not-have-rate 0.0) | |
(def c3-must-not-have-rate 0.0) | |
(future | |
(doseq [parent (->> (range) (take 10) (map ->uuid))] | |
(swap! parent-atom conj [::xt/put {:xt/id parent | |
:parent/string (str (hash parent))}])) | |
(doseq [c4 (->> (range) (take 300000) (map ->uuid))] | |
(let [must-have (< (rand) c4-must-have-rate) | |
must-not-have (and must-have (< (rand) c4-must-not-have-rate))] | |
(swap! c4-atom conj [::xt/put {:xt/id c4 | |
:c4/must-have (or must-have nil) | |
:c4/must-not-have (or must-not-have nil) | |
:c4/string (str (hash c4))}]))) | |
(doseq [c2 (->> (range) (take 30000) (map ->uuid))] | |
(swap! c2-atom conj [::xt/put {:xt/id c2 | |
:c2/string (str (hash c2))}])) | |
(let [parents @parent-atom | |
c2s @c2-atom] | |
(doseq [c1 (->> (range) (take 1000) (map ->uuid))] | |
(swap! c1-atom conj [::xt/put {:xt/id c1 | |
:c1/parent (get-in (rand-nth parents) [1 :xt/id]) | |
:c1/c2 (get-in (rand-nth c2s) [1 :xt/id]) | |
:c1/string (str (hash c1))}]))) | |
(let [c2s @c2-atom | |
c4s @c4-atom] | |
(doseq [c3 (->> (range) (take 1000) (map ->uuid))] | |
(swap! c3-atom conj [::xt/put {:xt/id c3 | |
:c3/must-not-have (or (< (rand) c3-must-not-have-rate) nil) | |
:c3/c2 (get-in (rand-nth c2s) [1 :xt/id]) | |
:c3/c4 (get-in (rand-nth c4s) [1 :xt/id]) | |
:c3/string (str (hash c3))}]))) | |
(println "atoms done")) | |
(comment | |
; explicit nils | |
(future | |
(restart-node) | |
(->> (concat @parent-atom @c1-atom @c2-atom @c3-atom @c4-atom) | |
(partition-all 500) | |
(map (fn [batch] | |
(xt/submit-tx node (vec batch)))) | |
last | |
(xt/await-tx node)) | |
(println "done")) | |
(xt/attribute-stats node) | |
; vanilla | |
(time | |
(dotimes [_ 100] | |
(time | |
(xt/q (xt/db node) '{:find [parent (count c4)] | |
:where [[c1 :c1/parent parent] | |
[c1 :c1/c2 c2] | |
[c3 :c3/c2 c2] | |
[c3 :c3/must-not-have nil] | |
[c3 :c3/c4 c4] | |
[c4 :c4/must-have must-have] | |
[(identity must-have)] | |
[c4 :c4/must-not-have nil]] | |
:order-by [[(count c4) :desc]] | |
:offset 0 | |
:limit 10})))) | |
; ~ 224ms / query | |
; custom aggregate | |
(time | |
(dotimes [_ 100] | |
(time | |
(xt/q (xt/db node) '{:find [parent (count-c4 c4)] | |
:where [[c1 :c1/parent parent] | |
[c1 :c1/c2 c2] | |
[c3 :c3/c2 c2] | |
[c3 :c3/must-not-have nil] | |
[c3 :c3/c4 c4]] | |
:order-by [[(count-c4 c4) :desc]] | |
:offset 0 | |
:limit 10})))) | |
; ~ 88ms / query | |
; custom aggregate + custom predicate | |
(time | |
(dotimes [_ 100] | |
(time | |
(xt/q (xt/db node) '{:find [parent (count-c4 c4)] | |
:where [[c1 :c1/parent parent] | |
[c1 :c1/c2 c2] | |
[c3 :c3/c2 c2] | |
[(repro/c3->c4 $ c3) c4]] | |
:offset 0 | |
:limit 10})))) | |
; ~ 17ms / query | |
; implicit nils | |
(future | |
(restart-node) | |
(->> (concat @parent-atom @c1-atom @c2-atom @c3-atom @c4-atom) | |
(map (fn [[tx doc]] | |
[tx (into {} (filter second doc))])) | |
(partition-all 500) | |
(map (fn [batch] | |
(xt/submit-tx node (vec batch)))) | |
last | |
(xt/await-tx node)) | |
(println "done")) | |
(xt/attribute-stats node) | |
; vanilla | |
(time | |
(dotimes [_ 100] | |
(time | |
(xt/q (xt/db node) '{:find [parent (count c4)] | |
:where [[c1 :c1/parent parent] | |
[c1 :c1/c2 c2] | |
[c3 :c3/c2 c2] | |
(not-join [c3] [c3 :c3/must-not-have]) | |
[c3 :c3/c4 c4] | |
[c4 :c4/must-have must-have] | |
(not-join [c4] [c4 :c4/must-not-have])] | |
:order-by [[(count c4) :desc]] | |
:offset 0 | |
:limit 10})))) | |
; ~ 193ms / query | |
; custom aggregate + custom predicate | |
(time | |
(dotimes [_ 100] | |
(time | |
(xt/q (xt/db node) '{:find [parent (count-c4 c4)] | |
:where [[c1 :c1/parent parent] | |
[c1 :c1/c2 c2] | |
[c3 :c3/c2 c2] | |
[(repro/c3->c4 $ c3) c4]] | |
:order-by [[(count-c4 c4) :desc]] | |
:offset 0 | |
:limit 10})))) | |
; ~ 16ms / query | |
; custom aggregate + custom predicate v2 | |
(time | |
(dotimes [_ 100] | |
(time | |
(xt/q (xt/db node) '{:find [parent (count-c4 c4)] | |
:where [[c1 :c1/parent parent] | |
[c1 :c1/c2 c2] | |
[c3 :c3/c2 c2] | |
[(repro/c3->c4-v2 $ c3) c4]] | |
:order-by [[(count-c4 c4) :desc]] | |
:offset 0 | |
:limit 10})))) | |
; ~ 25ms / query | |
(.close node) ; remove repro db dir and restart node | |
; add attributes to help with joins | |
(future | |
(restart-node) | |
(->> (concat @parent-atom @c1-atom @c2-atom @c3-atom @c4-atom) | |
(map (fn [[tx doc]] | |
[tx (->> (filter second doc) | |
(into {}))])) | |
(map (fn [[tx doc]] | |
(cond | |
(and (:c3/string doc) | |
(not (:c3/must-not-have doc))) | |
[tx (assoc doc :c3/ok? true)] | |
(and (:c4/must-have doc) | |
(not (:c4/must-not-have doc))) | |
[tx (assoc doc :c4/ok? true)] | |
:else [tx doc]))) | |
(partition-all 500) | |
(map (fn [batch] | |
(xt/submit-tx node (vec batch)))) | |
last | |
(xt/await-tx node)) | |
(println "done")) | |
(xt/attribute-stats node) | |
; vanilla | |
(time | |
(dotimes [_ 100] | |
(time | |
(xt/q (xt/db node) '{:find [parent (count c4)] | |
:where [[c1 :c1/parent parent] | |
[c1 :c1/c2 c2] | |
[c3 :c3/c2 c2] | |
[c3 :c3/ok?] | |
[c3 :c3/c4 c4] | |
[c4 :c4/ok?]] | |
:order-by [[(count c4) :desc]] | |
:offset 0 | |
:limit 10})))) | |
; ~ 15ms / query | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment