Skip to content

Instantly share code, notes, and snippets.

@ChrisBlom
Last active March 22, 2024 11:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ChrisBlom/940fdeab7066802b203be4380d5ea1ae to your computer and use it in GitHub Desktop.
Save ChrisBlom/940fdeab7066802b203be4380d5ea1ae to your computer and use it in GitHub Desktop.
Datomic change data capture
(ns datomic-cdc.core
"proof-of-concept that shows how setup change-data-capture for datomic"
(:require [datomic.api :as d]))
(def processed-t- (atom nil))
(defn start-cdc-thread
"starts a new thread to processes all past transactions starting at start-t, then continues processing incoming transactions, using the provided `change-handler`
`change-handler` must be a function that takes a single map argument with
:t the id of the transaction
:data the transacted datoms
:db-after the database as it is after the transaction
If start-t is nil, starts from the beginning
Returns a 0-arg function that stops the thread when invoked."
[conn start-t change-handler]
(let [txq (d/tx-report-queue conn)
txs-in-range (d/tx-range (d/log conn) start-t nil)
last-t-in-range (or (:t (last txs-in-range)) start-t)
running (atom true)
process-thread (Thread.
(try
(fn cdc-loop []
(println "Processing tx's from" (:t (first txs-in-range)) "to" last-t-in-range)
;; process all transaction from start-t
(doseq [{:keys [t data] :as tx} txs-in-range]
(let [change-event {:t t
:data data
:db-after (d/as-of (d/db conn) t)}]
(change-handler change-event)))
;; process new transactions
(println "Processing tx's after " last-t-in-range)
(try
(println "started cdc thread")
(while @running
(let [tx (.take txq)
t (d/basis-t (:db-after tx))
change-event {:t t
:data (into [] (:tx-data tx))
:db-after (:db-after tx)}]
(when (> t last-t-in-range) ; skip tx's that are also reported by tx-range
(change-handler change-event))))
(catch InterruptedException e
(println "Stopped cdc thread"))))
(catch Exception e
(.printStackTrace e)))
"datomic-cdc-thread")]
(.setDaemon process-thread true)
(.start process-thread)
(fn cdc-stop-fn []
(when-let [was-running (first (swap-vals! running (constantly false)))]
(println "Stopping cdc thread...")
(d/remove-tx-report-queue conn)
(.interrupt process-thread)
was-running))))
(def followers-by-user-
"the :user/follows relation realized as {entity #{values}}"
(atom {}))
(defn add-follower [follow-by-user user follower]
(update follow-by-user user (fnil conj #{}) follower))
(defn remove-follower [follow-by-user user follower]
(update follow-by-user user disj follower))
(def feed-by-user (atom {}))
(defn followers-handler [{:keys [t data db-after]}]
(when-first [ [e a v _ add] (filter (fn [ [e a v _ _] ] (= a (d/entid db-after :user/follows))) data)]
(let [user (:user/name (d/entity db-after e))
follows (:user/name (d/entity db-after v))]
(println "Updating follow list: " user " -> " follows " : " add)
(swap! followers-by-user- (if add add-follower remove-follower) user follows))))
(comment
(def db-uri "datomic:mem://tx-cdc-demo")
(d/create-database db-uri)
(def conn (d/connect db-uri))
(def cdc-thread (start-cdc-thread conn @processed-t-
(fn [change-event]
(println "Handling change: " change-event)
(followers-handler change-event)
(println "Setting processed-t to" (:t change-event))
(reset! processed-t- (:t change-event)))))
;; setup schema
@(d/transact conn
[{:db/ident :user/name
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db/unique :db.unique/identity
:db/doc "The name of the user"}
{:db/ident :user/follows
:db/valueType :db.type/ref
:db/cardinality :db.cardinality/many
:db/doc "The other users that this user follows"}
{:db/ident :user/message
:db/valueType :db.type/string
:db/cardinality :db.cardinality/many
:db/doc "The message produced by the user"}
{:db/ident :message/content
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db/doc "The content of a message"}
{:db/ident :message/timestamp
:db/valueType :db.type/instant
:db/cardinality :db.cardinality/one
:db/doc "The timestamp when the message was written"}])
;; add users
@(d/transact conn [{:db/id (d/tempid (d/implicit-part 1)) :user/name "Chris Blom"}
{:db/id (d/tempid (d/implicit-part 1)) :user/name "Rich Hickey"}
{:db/id (d/tempid (d/implicit-part 1)) :user/name "Alan Kay"}
{:db/id (d/tempid (d/implicit-part 1)) :user/name "Guy Steele"}])
;; follow some people
@(d/transact conn [ [:db/add [:user/name "Chris Blom"] :user/follows [:user/name "Rich Hickey"]]])
@(d/transact conn [ [:db/add [:user/name "Rich Hickey"] :user/follows [:user/name "Alan Kay"]]])
@(d/transact conn [ [:db/add [:user/name "Chris Blom"] :user/follows [:user/name "Alan Kay"]]])
@(d/transact conn [ [:db/add [:user/name "Chris Blom"] :user/follows [:user/name "Guy Steele"]]])
;; the followers-by-user- state should show the followers
@followers-by-user-
;; unfollow Alan Kay
@(d/transact conn [ [:db/retract [:user/name "Chris Blom"] :user/follows [:user/name "Alan Kay"]]])
;; the followers-by-user- state is being kept up to date with the changes to the database
(contains? (get @followers-by-user- "Chris Blom") "Alan Kay")
;; => true
;; the followers-by-user- state should reflect the followers in the database
(into {}
(d/q '[:find ?u-name (set ?f-name)
:where
[ ?u :user/name ?u-name ]
[ ?u :user/follows ?f]
[ ?f :user/name ?f-name]
]
(d/db conn)
))
;; processed-t- tracks the last processed transaction
@processed-t-
;; the cdc loop can be stopped
(cdc-thread)
;; update a fact while the cdc loop is stopped
@(d/transact conn [ [:db/retract [:user/name "Chris Blom"] :user/follows [:user/name "Rich Hickey"]]])
;; followers-by-users- is now not in sync with the database
(contains? (get @followers-by-user- "Chris Blom") "Rich Hickey")
;; => true
;; restarting the cdc loop from @processed-t- will process the transactions that
;; where performed while the cdc loop was stopped
(def cdc-thread (start-cdc-thread conn @processed-t-
(fn [change-event]
(println "Handling change: " change-event)
(followers-handler change-event)
(println "Setting processed-t to" (:t change-event))
(reset! processed-t- (:t change-event)))))
;; followers-by-users- is in sync again with the database:
(contains? (get @followers-by-user- "Chris Blom") "Rich Hickey")
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment