Skip to content

Instantly share code, notes, and snippets.

@refset
Last active April 11, 2021 11:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save refset/a5f810f18590a6f0d00630b102b78d00 to your computer and use it in GitHub Desktop.
Save refset/a5f810f18590a6f0d00630b102b78d00 to your computer and use it in GitHub Desktop.
crux-multi
(ns crux-multi
"highly experimental and not tested!
issues:
it probably doesn't handle multiple operations correctly
it is unable to safely re-sync after crash
lack of evict for the logs in the root node
roadmap for improvement:
store the conformed operations instead and have the docs live under their own separate entities (for evictions)
a stubbed TxLog like crux-corda has (https://github.com/juxt/crux-corda/blob/54e8c6b589d81c7c564b50a54d1c806de80e6308/crux-corda/src/main/clojure/crux/corda.clj#L113-L130)
^ that would then allow the listen to trigger a generic sync function that would use the custom open-tx-log, also call it at start-up (from a separate thread?)
it would also allow exposing the custom submit-tx for the child nodes
manage the child nodes as (individual?) components to ensure correlate the lifecycles
"
(:require [crux.api :as crux]
[crux.tx.conform :as txc]
[crux.db :as db])
(:import (crux.api icruxapi)
java.util.uuid))
(let [root-node (crux/start-node {})
multi-nodes {:n0 (crux/start-node {})
:n1 (crux/start-node {})
:n2 (crux/start-node {})}]
(crux/listen root-node {:crux/event-type :crux/indexed-tx :with-tx-ops? true}
(fn listener [{:keys [:crux.tx/tx-time] :as e}]
(let [{:keys [crux/tx-ops] :as tx-doc}
(-> e
:crux/tx-ops
first
second)
_ (assert (= 3 (count (keys tx-doc))))
tx-ops (first tx-ops)
node-id
(-> tx-doc
keys
set
(disj :crux.db/id :crux/tx-ops)
first)
tx-id (node-id tx-doc)]
(try
(let [tx-ops (crux.api/conform-tx-ops tx-ops)
conformed-tx-ops (mapv txc/conform-tx-op tx-ops)
docs (into {} (mapcat :docs conformed-tx-ops))
tx-events (mapv txc/->tx-event conformed-tx-ops)
tx {:crux.tx/tx-events tx-events
:doc docs
:crux.tx/tx-time tx-time
:crux.tx/tx-id tx-id}
{:keys [tx-ingester document-store]} (node-id multi-nodes)]
(let [in-flight-tx (db/begin-tx tx-ingester tx nil)]
(try
(db/submit-docs document-store docs)
(db/index-tx-events in-flight-tx tx-events)
(db/commit in-flight-tx)
(catch Exception e
(clojure.tools.logging/error e (format "Node %s failed to index transaction %s" node-id tx-id))
(db/abort in-flight-tx)))))
(catch Exception e
(clojure.tools.logging/error e (format "Node %s failed received invalid transaction %s" node-id tx-id)))))))
(crux/submit-tx root-node
[[:crux.tx/put {:crux.db/id (java.util.UUID/randomUUID)
:n0 0
:crux/tx-ops [[[:crux.tx/put {:crux.db/id :foo
:bar
:baz}]]]}]])
(Thread/sleep 100)
(crux/entity (crux/db (:n0 multi-nodes)) :foo))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment