Skip to content

Instantly share code, notes, and snippets.

@timewald
Created November 15, 2015 16:09
Show Gist options
  • Save timewald/d9acbde47debd8a6793c to your computer and use it in GitHub Desktop.
Save timewald/d9acbde47debd8a6793c to your computer and use it in GitHub Desktop.
;; setup db to test with
;; note that this setup uses a local dev transactor
;; you can use a different transactor, but you cannot
;; use a mem db because it does not support the log API
(require '[datomic.api :as d])
(def uri "datomic:dev://localhost:4334/reified-txes")
(d/delete-database uri)
(d/create-database uri)
(def conn (d/connect uri))
;; define schema for all examples
(def schema [;; base person info
{:db/id (d/tempid :db.part/db)
:db/ident :person/name
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id (d/tempid :db.part/db)
:db/ident :person/email
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
;; saga
{:db/id (d/tempid :db.part/db)
:db/ident :person/saga
:db/valueType :db.type/long
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
;; audit
{:db/id (d/tempid :db.part/db)
:db/ident :tx/user
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db/index true
:db.install/_attribute :db.part/db}
;; unique txid
{:db/id (d/tempid :db.part/db)
:db/ident :tx/id
:db/valueType :db.type/uuid
:db/cardinality :db.cardinality/one
:db/unique :db.unique/value
:db/index true
:db.install/_attribute :db.part/db}
;; cfg mgmt
{:db/id (d/tempid :db.part/db)
:db/ident :list/name
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db/unique :db.unique/value
:db/index true
:db.install/_attribute :db.part/db}
{:db/id (d/tempid :db.part/db)
:db/ident :list/person
:db/valueType :db.type/ref
:db/cardinality :db.cardinality/many
:db.install/_attribute :db.part/db}
{:db/id (d/tempid :db.part/db)
:db/ident :list/version
:db/valueType :db.type/long
:db/cardinality :db.cardinality/one
:db/unique :db.unique/value
:db/index true
:db.install/_attribute :db.part/db}
;; import
{:db/id (d/tempid :db.part/db)
:db/ident :import/name
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db/unique :db.unique/value
:db/index true
:db.install/_attribute :db.part/db}
{:db/id (d/tempid :db.part/db)
:db/ident :import/complete
:db/valueType :db.type/boolean
:db/cardinality :db.cardinality/one
:db/index true
:db.install/_attribute :db.part/db}
{:db/id (d/tempid :db.part/db)
:db/ident :import/tx
:db/valueType :db.type/ref
:db/cardinality :db.cardinality/many
:db.install/_attribute :db.part/db}
;; auto compensation
{:db/id (d/tempid :db.part/db)
:db/ident :tx/compensates
:db/valueType :db.type/ref
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}])
;; transact the schema
@(d/transact conn schema)
;; auditing
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:person/name "Tim"}
{:db/id (d/tempid :db.part/tx)
:tx/user "Bob" }])
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:person/name "Gus"}
{:db/id (d/tempid :db.part/tx)
:tx/user "Bob" }])
(d/q '[:find ?e ?a ?v ?tx ?op
:in $ ?log ?who
:where
[?tx :tx/user ?who]
[(tx-data ?log ?tx) [[?e ?a ?v _ ?op]]]]
(d/db conn)
(d/log conn)
"Bob")
;; ensure work was done
(def unique-txid #uuid "a14163db-6f66-40d5-9a3b-0f8891f8cada")
(def unique-tx [{:db/id (d/tempid :db.part/user)
:person/name "Sarah"}
{:db/id (d/tempid :db.part/tx)
:tx/id unique-txid}])
@(d/transact conn unique-tx)
(when (empty? (d/datoms (d/db conn) :avet :tx/id unique-txid))
(try
@(d/transact conn unique-tx)
(catch Throwable t
(let [cause (.getCause t)]
(when-not
(and (= :db.error/unique-conflict
(:db/error (ex-data cause)))
(.contains (.getMessage cause) ":tx/id"))
(throw t))))))
;; versioned data
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:list/name "Ewalds"
:list/version 0}])
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:person/name "Tim"
:list/_person [:list/name "Ewalds"]}])
@(d/transact conn [{:db/id [:list/name "Ewalds"]
:list/version 1}])
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:person/name "Gus"
:list/_person [:list/name "Ewalds"]}])
@(d/transact conn [{:db/id [:list/name "Ewalds"]
:list/version 2}])
(defn db-for-version-of-list
[db list-name ver]
(let [tx (ffirst (d/q '[:find ?tx
:in $ ?lname ?lver
:where
[?list :list/name ?lname]
[?list :list/version ?lver ?tx ?added]
[(= ?added true)]]
(-> db d/history)
list-name
ver))]
(d/as-of db tx)))
(defn names-on-version-of-list
([db list-name]
(names-on-version-of-list db list-name (ffirst (d/q '[:find ?lver
:in $ ?lname
:where
[?l :list/name ?lname]
[?l :list/version ?lver]]
db
list-name))))
([db list-name ver]
(d/q '[:find ?pname
:in $ ?lname
:where
[?l :list/name ?lname]
[?l :list/person ?p]
[?p :person/name ?pname]]
(db-for-version-of-list db list-name ver)
"Ewalds")))
(names-on-version-of-list (d/db conn) "Ewalds")
;; import
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:import/name "adding-names-1"
:import/complete false}])
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:person/name "Tim"}
{:db/id [:import/name "adding-names-1"]
:import/tx (d/tempid :db.part/tx)}])
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:person/name "Gus"}
{:db/id [:import/name "adding-names-1"]
:import/tx (d/tempid :db.part/tx)}])
@(d/transact conn [{:db/id [:import/name "adding-names-1"]
:import/tx (d/tempid :db.part/tx)
:import/complete true}])
(def people '[[(people ?p)
[?i :import/complete true]
[?i :import/tx ?tx]
[?p :person/name _ ?tx]]])
(d/q '[:find (pull ?p [:db/id :person/name])
:in $ %
:where
[people ?p]]
(d/db conn)
people)
;; automatic compensation
;; generate compensating tx, ignores datoms about original tx, adds reference
;; to original tx to new tx
(defn invert-tx
[log tx]
(transduce (comp (remove (fn [[e _ _ tx _]] (= e tx)))
(map (fn [[e a v tx added]] [(if added :db/retract :db/add) e a v])))
conj
[[:db/add (d/tempid :db.part/tx) :tx/compensates tx]]
(-> (d/tx-range log tx (inc tx)) first :data)))
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:person/name "Tim"
:person/email "tim@cognitect.com"}
{:db/id (d/tempid :db.part/tx)
:person/saga 10}])
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:person/name "Gus"}
{:db/id (d/tempid :db.part/tx)
:person/saga 10}])
;; look up txes for saga 10
(def saga-txes (->> (d/q '[:find ?tx :where [?tx :person/saga 10]] (d/db conn))
(map first)
(sort >)))
;; generate compensating txes
(def compensating-txes
(let [log (d/log conn)]
(map #(invert-tx log %) saga-txes)))
;; process compensating txes
(doseq [compensating-tx compensating-txes]
(prn @(d/transact conn compensating-tx)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment