-
-
Save refset/a5f810f18590a6f0d00630b102b78d00 to your computer and use it in GitHub Desktop.
crux-multi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns crux-multi | |
"highly experimental and not tested! | |
issues: | |
it probably doesn't handle multiple operations correctly | |
it is unable to safely re-sync after crash | |
lack of evict for the logs in the root node | |
roadmap for improvement: | |
store the conformed operations instead and have the docs live under their own separate entities (for evictions) | |
a stubbed TxLog like crux-corda has (https://github.com/juxt/crux-corda/blob/54e8c6b589d81c7c564b50a54d1c806de80e6308/crux-corda/src/main/clojure/crux/corda.clj#L113-L130) | |
^ that would then allow the listen to trigger a generic sync function that would use the custom open-tx-log, also call it at start-up (from a separate thread?) | |
it would also allow exposing the custom submit-tx for the child nodes | |
manage the child nodes as (individual?) components to ensure correlate the lifecycles | |
" | |
(:require [crux.api :as crux] | |
[crux.tx.conform :as txc] | |
[crux.db :as db]) | |
(:import (crux.api icruxapi) | |
java.util.uuid)) | |
(let [root-node (crux/start-node {}) | |
multi-nodes {:n0 (crux/start-node {}) | |
:n1 (crux/start-node {}) | |
:n2 (crux/start-node {})}] | |
(crux/listen root-node {:crux/event-type :crux/indexed-tx :with-tx-ops? true} | |
(fn listener [{:keys [:crux.tx/tx-time] :as e}] | |
(let [{:keys [crux/tx-ops] :as tx-doc} | |
(-> e | |
:crux/tx-ops | |
first | |
second) | |
_ (assert (= 3 (count (keys tx-doc)))) | |
tx-ops (first tx-ops) | |
node-id | |
(-> tx-doc | |
keys | |
set | |
(disj :crux.db/id :crux/tx-ops) | |
first) | |
tx-id (node-id tx-doc)] | |
(try | |
(let [tx-ops (crux.api/conform-tx-ops tx-ops) | |
conformed-tx-ops (mapv txc/conform-tx-op tx-ops) | |
docs (into {} (mapcat :docs conformed-tx-ops)) | |
tx-events (mapv txc/->tx-event conformed-tx-ops) | |
tx {:crux.tx/tx-events tx-events | |
:doc docs | |
:crux.tx/tx-time tx-time | |
:crux.tx/tx-id tx-id} | |
{:keys [tx-ingester document-store]} (node-id multi-nodes)] | |
(let [in-flight-tx (db/begin-tx tx-ingester tx nil)] | |
(try | |
(db/submit-docs document-store docs) | |
(db/index-tx-events in-flight-tx tx-events) | |
(db/commit in-flight-tx) | |
(catch Exception e | |
(clojure.tools.logging/error e (format "Node %s failed to index transaction %s" node-id tx-id)) | |
(db/abort in-flight-tx))))) | |
(catch Exception e | |
(clojure.tools.logging/error e (format "Node %s failed received invalid transaction %s" node-id tx-id))))))) | |
(crux/submit-tx root-node | |
[[:crux.tx/put {:crux.db/id (java.util.UUID/randomUUID) | |
:n0 0 | |
:crux/tx-ops [[[:crux.tx/put {:crux.db/id :foo | |
:bar | |
:baz}]]]}]]) | |
(Thread/sleep 100) | |
(crux/entity (crux/db (:n0 multi-nodes)) :foo)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment