Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hmaurer/8067fc1d2baea887a671120adb8ec003 to your computer and use it in GitHub Desktop.
Save hmaurer/8067fc1d2baea887a671120adb8ec003 to your computer and use it in GitHub Desktop.
Erasing data from Datomic via a manual data migration
;; # EMULATING DATOMIC EXCISION VIA MANUAL DATA MIGRATION
;; *************************************
;; ## Introduction
;; *************************************
;; This Gist demonstrates a generic way to migrate an entire Datomic database
;; from an existing 'Origin Connection' to a clean 'Destination Connection',
;; while getting rid of some undesirable data and otherwise preserving history.
;; (In this example, the undesirable data consists of the values stored in some
;; attributes, but you can adapt the processing to your use case.)
;; It does so by processing the Log of the Origin Connection, and creating
;; a new transaction for each entry, writing it to the Destination Connection.
;; *************************************
;; ## The problem
;; *************************************
;; We need to erase data from the history, which is the intention of Datomic Excision.
;; However, Datomic Excision is not suitable in some cases (for instance, it does not erase
;; data of :db/fulltext attributes), and is not available in some deployments (Datomic Cloud).
;; *************************************
;; ## Strategy
;; *************************************
;; The difficulty in migrating data from a Datomic db to another is 'Entity Id Renumbering':
;; A given domain entity will not have the same :db/id in the Origin db and the Destination db,
;; which makes it difficult to preserve the relationships between these entities.
;; To combat this, we introduce a new :db.type/long, :db.unique/identity attribute in the Destination db,
;; which keeps track of Entity Id that each entity had in the Origin db,
;; and we rely on upsert behaviour.
;; *************************************
;; ## Usage notes
;; *************************************
;; 1. The partition of entities are preserved.
;; 2. It is safe to stop and restart this job, it will pick up where it left.
;; 3. You may want to do the bulk of the migration with dev: connections on your local machine, to
;; get better performance and cheaper resource utilization. You can then backup/restore
;; the Destination db to the production deployment of the Destination Connection,
;; migrate the last datoms, then switch your production system from the Origin to the Destination deployment
;; (will probably require a small downtime).
;; 4. If some of the blacklisted attributes have changed their :db/ident in the history of the Origin db, use their present :db/ident.
;; 5. This example is for Peers, but you can easily adapt it to Clients.
;; 6. To be resilient to errors, the transactions are NOT pipelined.
;; *************************************
;; ## Application-specific params
;; *************************************
;; Adapt these to your needs:
(def migr-attribute-ident
"The ident of a new attribute which will be added to dest-conn
to keep track of the entity id of an entity in origin-conn."
:rewrite-db/v0-entid)
(def migr-attribute-doc
"The :db/id that this entity had in the v0 version of the database.")
(def blacklisted-attrs-idents
"The idents of some attributes which will not be used in the destination conn.
In the destination db, these attributes will be installed, but no datom having them in the attribute position
will be present."
;; in this example, the goal is to get rid of the :db/fulltext nature of any attribute,
;; because it prevents their data from being excised.
#{:db/fulltext})
;; *************************************
;; ## Usage
;; *************************************
;; Load this entire file in your REPL,
;; then manually execute the code in the (comment ... ) block.
;; You will need the Datomic Peer Library And clojure/core.async on the classpath.
(require '[datomic.api :as d])
(require '[clojure.core.async :as a])
(import '(java.util TreeMap NavigableMap Map Date))
(declare
;; Those will be implemented below
prepare-dest-conn!
migrate-data!)
(comment
(def orgn-conn (d/connect "... FIXME"))
(def dest-conn-uri "datomic:... FIXME")
;; creating and preparing the Destination Connection - you only need to do this once
;; ----------------------
(d/create-database dest-conn-uri)
(prepare-dest-conn!
orgn-conn
(d/connect dest-conn-uri))
;; ----------------------
(def dest-conn (d/connect dest-conn-uri))
;; migrating the data - returns immediately, work happens in another thread
(def stop!
(migrate-data! orgn-conn dest-conn))
;; Optional: if you want to interrupt the processing, invoke the returned function:
(stop!)
)
;(require 'sc.api) ;; Optional: use scope-capture for debugging - see: https://github.com/vvvvalvalval/scope-capture
;; *************************************
;; ## Implementation
;; *************************************
(defn prepare-dest-conn!
"Prepares the Destination Connection, by installing the schema and data
supporting the use of the migration attribute,
before the first :db/txInstant of the Origin Connection."
[orgn-conn dest-conn]
(when-not (empty? (d/tx-range (d/log dest-conn) nil nil))
(throw (ex-info "dest-conn should be empty." {})))
(let [t0 (-> (d/log orgn-conn)
(d/tx-range nil nil)
seq
(or (throw (ex-info
"orgn-conn has an empty Log"
{})))
first
:t)
^Date first-tx-instant
(-> (d/entity (d/db orgn-conn) (d/t->tx t0))
:db/txInstant)
dest-db (d/db dest-conn)
tx-install-attr
[[:db/add (d/tempid :db.part/tx) :db/txInstant
(Date. (long (-> first-tx-instant .getTime (- 2000))))]
{:db/cardinality :db.cardinality/one,
:db/index true,
:db.install/_attribute :db.part/db,
:db/id (d/tempid :db.part/db),
:db/ident migr-attribute-ident,
:db/valueType :db.type/long,
:db/doc migr-attribute-doc,
:db/unique :db.unique/identity}]
tx-install-start-idents-ids
(->>
(d/q '[:find ?orgn-e ?dest-e ?ident :in $orgn $dest :where
[$dest ?dest-e :db/ident ?ident]
[$orgn ?orgn-e :db/ident ?ident]]
(d/as-of (d/db orgn-conn) (dec t0))
dest-db)
(map (fn [[orgn-e dest-e]]
[:db/add dest-e migr-attribute-ident orgn-e]))
(into [[:db/add (d/tempid :db.part/tx) :db/txInstant
(Date. (long (-> first-tx-instant .getTime (- 1000))))]]))]
[@(d/transact-async dest-conn tx-install-attr)
@(d/transact-async dest-conn tx-install-start-idents-ids)]
))
(defn- find-last-migrated-t
"Finds the last Origin t which has been migrated to dest-conn,
or nil if no Log entry has been migrated to dest-conn."
[dest-conn]
(let [new-db (d/db dest-conn)
last-t (d/basis-t new-db)]
(when-let [old-tx-eid
(migr-attribute-ident (d/entity new-db (d/t->tx last-t)))]
(d/tx->t old-tx-eid))))
(defn- ident-finder
"Given a database value, returns a function which can quickly compute the ident of
an entity at any point in time."
[orng-db]
(let [eid->t->ident
(->>
(d/q '[:find ?e ?ident ?tx ?added :in $ :where
[?e :db/ident ?ident ?tx ?added]]
(d/history orng-db))
(group-by (fn [[e _ident _tx _added]] e))
(into {}
(map (fn [[e tuples]]
[e
(TreeMap.
^Map
(->> tuples
(sort-by (fn [[_p _ident tx added]] [tx added]))
(group-by (fn [[_p _ident tx _added]] (d/tx->t tx)))
(into {}
(map (fn [[t tuples]]
(case (count tuples)
1
(let [[_ ident _ added] (first tuples)]
(if added
[t ident]
[t nil]))
2
(let [[_ ident _ _] (last tuples)]
[t ident])))))))]))))]
(fn find-ident-at-t
([eid t]
(when-some [^NavigableMap t->ident (get eid->t->ident eid)]
(when-some [e (.floorEntry t->ident t)]
(.getValue e)))))))
(defn- safe-onto-chan
"A blocking version of onto-chan that doesn't have memory leaks."
([ch coll]
(safe-onto-chan ch coll true))
([ch coll close?]
(a/thread
(loop [vs (seq coll)]
(if (and vs (a/>!! ch (first vs)))
(recur (next vs))
(when close?
(a/close! ch)))))))
(defn migrate-data!
"Migrates data from orgn-conn to dest-conn, while removing some datoms and preserving history.
Returns immediately, returning a 0-arity 'stop!' function which may be called to interrupt the processing.
An optional, 0-arity callback function may be supplied, which will be called when the processing is done
(either because the whole log has been consumed, or because 'stop!' has been called, or because an error occured.
Processes the Log of orgn-conn, creating a transaction for each log entry, and transacting it to dest-conn,
while omitting datoms that have one of the blacklisted attributes in attribute position."
([orgn-conn dest-conn]
(migrate-data! orgn-conn dest-conn (constantly nil)))
([orgn-conn dest-conn on-done]
(let [orgn-db (d/db orgn-conn)
find-ident-at-t (ident-finder orgn-db)
orgn-ref-attrs
(set
(d/q '[:find [?a ...] :where
[?a :db/valueType :db.type/ref _ true]]
(d/history orgn-db)))
blacklisted-attr-eids
(into #{}
(map (fn [ident]
(d/entid orgn-db ident)))
blacklisted-attrs-idents)
=i+log-entrys=
(a/chan 64 (map-indexed vector))]
(safe-onto-chan
=i+log-entrys=
(d/tx-range
(d/log orgn-conn)
(when-some [last-t (find-last-migrated-t dest-conn)]
(inc last-t))
nil))
(a/thread
(time
(loop []
(if-some [i+le (a/<!! =i+log-entrys=)]
(let [[i log-entry] i+le]
(let [t (:t log-entry)]
(when (-> i (mod 10000) (= 0))
(printf "%tT Processed %d txes, now at t = %d \n"
(Date.) i t)))
(let [t (:t log-entry)
tx
(try
(->> log-entry :data
(remove
(fn [[_e a _v _tx _added?]]
(contains? blacklisted-attr-eids a)))
(map
(fn [[e a v _tx added?]]
(let [new-a
[migr-attribute-ident a]]
(if added?
{:db/id (d/tempid (find-ident-at-t (d/part e) t))
migr-attribute-ident e
new-a
(if (orgn-ref-attrs a)
{:db/id (d/tempid (find-ident-at-t (d/part v) t))
migr-attribute-ident v}
v)}
[:db/retract
[migr-attribute-ident e]
new-a
(if (orgn-ref-attrs a)
[migr-attribute-ident v]
v)]))))
vec)
(catch Throwable err
(let [new-db (d/db dest-conn)
old-as-of
(d/as-of orgn-db (:t log-entry))]
;(sc.api/spy t) ;; Uncomment to be able to easily reproduce the context of errors using scope-capture.
(a/close! =i+log-entrys=)
(throw
(ex-info
(str "Error when building tx number " i ", aborted processing.")
{:i i :t t
:log-entry log-entry}
err)))))]
(try
@(d/transact-async dest-conn tx)
(catch Throwable err
(let [new-db (d/db dest-conn)
old-as-of
(d/as-of orgn-db (:t log-entry))]
;(sc.api/spy t) ;; Uncomment to be able to easily reproduce the context of errors using scope-capture.
(a/close! =i+log-entrys=)
(throw
(ex-info
(str "Error when transacting tx number " i ", aborted processing.")
{:tx tx
:i i :t t
:log-entry log-entry}
err))))))
(recur))
(do
(printf "%tT Done migrating, now at Origin's t = %d \n"
(Date.)
(find-last-migrated-t dest-conn))
(on-done))))))
(fn stop! []
(a/close! =i+log-entrys=)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment