Skip to content

Instantly share code, notes, and snippets.

@theronic
Last active June 10, 2024 14:19
Show Gist options
  • Save theronic/6cdf895d4f9b5801b38cfbf75c5658e3 to your computer and use it in GitHub Desktop.
Save theronic/6cdf895d4f9b5801b38cfbf75c5658e3 to your computer and use it in GitHub Desktop.
Use CAS after Spice Write (write lock)
(ns permissions.scratch-optimistic-lockig
(:require [datomic.api :as d]
[permissions.protocols :as authz :refer (->user ->account)]
[permissions.spicedb.impl :as spice]
[permissions.spicedb.consistency :as consistency]
[permissions.spicedb.testing :as spice-testing]
[taoensso.timbre :as log])
(:import (clojure.lang ExceptionInfo)
(java.util.concurrent Semaphore)
(datomic.impl Exceptions$IllegalStateExceptionInfo)))
(def semaphore (Semaphore. 2))
(def datomic-uri "datomic:mem://spice")
(def !checks (atom 0))
(def max-delay-ms 20)
(def client nil)
(defn qry-account-owners [db account-id]
(->> (d/q '[:find ?user
:in $ ?account
:where
[?account :account/owner ?user]]
db account-id)
(map first)))
(defn cas-conflict?
"Specific to Datomic Peer library. Cloud is different."
[ex]
(= :db.error/cas-failed (:db/error (ex-data (.getCause ex)))))
(defn sync-datomic->spice [client conn db account-id retries]
(let [db (d/db conn) ;; get fresh db. good idea?
account (d/entity db account-id)
version (or (:spice/version account) 0)
desired-users (set (qry-account-owners db account-id))
current-users (->> (authz/read-relationships client {:resource/type :account
:resource/relation :owner
:subject/type :user})
(map (comp parse-long :id :subject))
(set))
unwanted-users (clojure.set/difference current-users desired-users)
users-to-add (clojure.set/difference desired-users current-users)]
(try
(.acquire semaphore) ; gRPC has parallel write limit in single process.
(authz/update-relationships! client
(into
(for [unwanted-user-id unwanted-users]
[:delete (->user unwanted-user-id) :owner (->account account-id)])
(for [wanted-user-id users-to-add]
[:touch (->user wanted-user-id) :owner (->account account-id)])))
(finally
(.release semaphore)))
; can except.
(try
; use CAS to increment Spice version in Datomic. Will conflict on concurrent write and retry.
@(d/transact conn [[:db/cas account-id :spice/version version (inc version)]]) ;; do we need deref here?
(catch Exception ex
(if (cas-conflict? ex)
(do
;(log/debug "Conflict. Retrying: " retries)
(if (pos? retries)
(sync-datomic->spice client conn (d/db conn) account-id (dec retries))
(throw (java.lang.IllegalStateException. "Max retries exceeded."))))
(throw ex))))))
(defn task-add-user-to-account! [conn client account-id user-id]
(let [{:as report :keys [db-after]} @(d/transact conn [[:db/add account-id :account/owner user-id]])]
(future (sync-datomic->spice client conn db-after account-id 100))))
(defn task-remove-user-from-account! [conn client account-id user-id]
(let [{:as report :keys [db-after]} @(d/transact conn [[:db/retract account-id :account/owner user-id]])]
(future (sync-datomic->spice client conn db-after account-id 100))))
(defn is-user-admin-in-spice? [client user-id account-id]
(swap! !checks inc)
(authz/can? client (->user user-id) :admin (->account account-id) consistency/fully-consistent))
(def !unwanted-access (atom 0))
(def !unwanted-deny (atom 0))
(def !expected-access (atom 0))
(def !expected-denial (atom 0))
(defn retry
[retries f & args]
(let [res (try {:value (apply f args)}
(catch Exception e
(if (zero? retries)
(do
(log/debug "retry giving up")
(throw e))
{:exception e})))]
(if (:exception res)
(do
(log/debug "retrying. n: " retries)
(recur (dec retries) f args))
(:value res))))
(defn is-owner-in-datomic? [db user-id account-id]
(boolean (seq (d/datoms db :eavt account-id :account/owner user-id))))
(defn run-sync-task-handler [conn num-times]
(let [{:keys [channel client]} (spice-testing/make-test-client)
conn (d/connect datomic-uri) ;; try using own conn
db (d/db conn)
user-id (:db/id (d/entity db [:user/ident :test/user]))
account-id (:db/id (d/entity db [:account/ident :test/account]))]
;(log/debug "run-sync user: " user-id ", account: " account-id, "admin? " (is-user-admin-in-spice? client user-id account-id))
(dotimes [n num-times]
; we are attempting to find a race condition here
; Even though assign permission & revoke tasks are in order & Datomic is consistent, delays in writing
; to Spice can cause inconsistent Spice state.
(let [!assign (task-add-user-to-account! conn client account-id user-id)
!revoke (task-remove-user-from-account! conn client account-id user-id)]
@!assign @!revoke))
;(let [!assign (retry 100 task-add-user-to-account! conn client account-id user-id)
; !revoke (retry 100 task-remove-user-from-account! conn client account-id user-id)]))
;@!assign @!revoke))
(spice/shutdown-channel! channel)))
(defn run-async-test! [& {:keys [num-threads checks-per-thread]}]
(reset! !checks 0)
(reset! !unwanted-access 0)
(reset! !unwanted-deny 0)
(reset! !expected-access 0)
(reset! !expected-denial 0)
;; write Spice schema here if needed
;(let [{:keys [channel client]} (spice-testing/make-test-client)]
; (def client client)
; (def channel channel))
(d/delete-database datomic-uri)
(d/create-database datomic-uri)
(def *conn (d/connect datomic-uri))
@(d/transact *conn
[{:db/ident :user/ident
:db/valueType :db.type/keyword
:db/cardinality :db.cardinality/one
:db/unique :db.unique/identity}
{:db/ident :account/ident
:db/valueType :db.type/keyword
:db/cardinality :db.cardinality/one
:db/unique :db.unique/identity}
{:db/ident :account/owner
:db/valueType :db.type/ref
:db/cardinality :db.cardinality/many
:db/index true}
{:db/ident :spice/version
:db/valueType :db.type/long
:db/cardinality :db.cardinality/one}])
(let [{:as report :keys [tempids]}
@(d/transact *conn
[{:db/id "account", :account/ident :test/account, :spice/version 0}
{:db/id "user", :user/ident :test/user, :spice/version 0}])
user-id (get tempids "user")
account-id (get tempids "account")
{:keys [channel client]} (spice-testing/make-test-client)]
(def *account-id account-id)
(def *user-id user-id)
(def *client client)
; clear out relationship if it exists from previous run.
(when (is-user-admin-in-spice? client user-id account-id)
(log/warn "Clearing :owner from a previous run.")
(authz/delete-relationship! client (->user user-id) :owner (->account account-id)))
(let [futures (for [x (range num-threads)]
(future (run-sync-task-handler *conn checks-per-thread)))]
(mapv deref futures)
(try
(Thread/sleep 500) ;; wait for any Spice futures to complete.
(let [db (d/db *conn)
;unwanted-access-ratio (str (/ @!unwanted-access (+ @!expected-access @!unwanted-access)))]
x {:checks-per-thread @!checks
:inconsistent? (not=
(is-user-admin-in-spice? client user-id account-id)
(is-owner-in-datomic? db user-id account-id))
;:unwanted-access-ratio unwanted-access-ratio
:expected-access @!expected-access
:expected-denial @!expected-denial
:unwanted-access @!unwanted-access
:unwanted-deny @!unwanted-deny}]
; (spice/shutdown-channel! channel) ; skip channel shutdown for debugging.
x)))))
(comment
(let [{:keys [channel client]} (spice-testing/make-test-client)]
(authz/write-schema! client (spice-testing/load-test-schema))
(spice/shutdown-channel! channel))
(run-async-test! :num-threads 2 :checks-per-thread 5)
(run-sync-task-handler *conn 1)
(qry-account-owners (d/db *conn) *account-id)
@(d/transact *conn [[:db/add *account-id :account/owner *user-id]])
(is-owner-in-datomic? (d/db *conn) *user-id *account-id)
(is-user-admin-in-spice? *client *user-id *account-id)
(sync-datomic->spice *client (d/db *conn) *account-id)
(loop []
(let [x (run-async-test! :num-threads 3 :checks-per-thread 5)]
(if (:inconsistent? x)
(log/warn "Unwanted persistent access." x)
(recur)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment