-
-
Save theronic/6cdf895d4f9b5801b38cfbf75c5658e3 to your computer and use it in GitHub Desktop.
Use CAS after Spice Write (write lock)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns permissions.scratch-optimistic-lockig | |
(:require [datomic.api :as d] | |
[permissions.protocols :as authz :refer (->user ->account)] | |
[permissions.spicedb.impl :as spice] | |
[permissions.spicedb.consistency :as consistency] | |
[permissions.spicedb.testing :as spice-testing] | |
[taoensso.timbre :as log]) | |
(:import (clojure.lang ExceptionInfo) | |
(java.util.concurrent Semaphore) | |
(datomic.impl Exceptions$IllegalStateExceptionInfo))) | |
(def semaphore (Semaphore. 2)) | |
(def datomic-uri "datomic:mem://spice") | |
(def !checks (atom 0)) | |
(def max-delay-ms 20) | |
(def client nil) | |
(defn qry-account-owners [db account-id] | |
(->> (d/q '[:find ?user | |
:in $ ?account | |
:where | |
[?account :account/owner ?user]] | |
db account-id) | |
(map first))) | |
(defn cas-conflict? | |
"Specific to Datomic Peer library. Cloud is different." | |
[ex] | |
(= :db.error/cas-failed (:db/error (ex-data (.getCause ex))))) | |
(defn sync-datomic->spice [client conn db account-id retries] | |
(let [db (d/db conn) ;; get fresh db. good idea? | |
account (d/entity db account-id) | |
version (or (:spice/version account) 0) | |
desired-users (set (qry-account-owners db account-id)) | |
current-users (->> (authz/read-relationships client {:resource/type :account | |
:resource/relation :owner | |
:subject/type :user}) | |
(map (comp parse-long :id :subject)) | |
(set)) | |
unwanted-users (clojure.set/difference current-users desired-users) | |
users-to-add (clojure.set/difference desired-users current-users)] | |
(try | |
(.acquire semaphore) ; gRPC has parallel write limit in single process. | |
(authz/update-relationships! client | |
(into | |
(for [unwanted-user-id unwanted-users] | |
[:delete (->user unwanted-user-id) :owner (->account account-id)]) | |
(for [wanted-user-id users-to-add] | |
[:touch (->user wanted-user-id) :owner (->account account-id)]))) | |
(finally | |
(.release semaphore))) | |
; can except. | |
(try | |
; use CAS to increment Spice version in Datomic. Will conflict on concurrent write and retry. | |
@(d/transact conn [[:db/cas account-id :spice/version version (inc version)]]) ;; do we need deref here? | |
(catch Exception ex | |
(if (cas-conflict? ex) | |
(do | |
;(log/debug "Conflict. Retrying: " retries) | |
(if (pos? retries) | |
(sync-datomic->spice client conn (d/db conn) account-id (dec retries)) | |
(throw (java.lang.IllegalStateException. "Max retries exceeded.")))) | |
(throw ex)))))) | |
(defn task-add-user-to-account! [conn client account-id user-id] | |
(let [{:as report :keys [db-after]} @(d/transact conn [[:db/add account-id :account/owner user-id]])] | |
(future (sync-datomic->spice client conn db-after account-id 100)))) | |
(defn task-remove-user-from-account! [conn client account-id user-id] | |
(let [{:as report :keys [db-after]} @(d/transact conn [[:db/retract account-id :account/owner user-id]])] | |
(future (sync-datomic->spice client conn db-after account-id 100)))) | |
(defn is-user-admin-in-spice? [client user-id account-id] | |
(swap! !checks inc) | |
(authz/can? client (->user user-id) :admin (->account account-id) consistency/fully-consistent)) | |
(def !unwanted-access (atom 0)) | |
(def !unwanted-deny (atom 0)) | |
(def !expected-access (atom 0)) | |
(def !expected-denial (atom 0)) | |
(defn retry | |
[retries f & args] | |
(let [res (try {:value (apply f args)} | |
(catch Exception e | |
(if (zero? retries) | |
(do | |
(log/debug "retry giving up") | |
(throw e)) | |
{:exception e})))] | |
(if (:exception res) | |
(do | |
(log/debug "retrying. n: " retries) | |
(recur (dec retries) f args)) | |
(:value res)))) | |
(defn is-owner-in-datomic? [db user-id account-id] | |
(boolean (seq (d/datoms db :eavt account-id :account/owner user-id)))) | |
(defn run-sync-task-handler [conn num-times] | |
(let [{:keys [channel client]} (spice-testing/make-test-client) | |
conn (d/connect datomic-uri) ;; try using own conn | |
db (d/db conn) | |
user-id (:db/id (d/entity db [:user/ident :test/user])) | |
account-id (:db/id (d/entity db [:account/ident :test/account]))] | |
;(log/debug "run-sync user: " user-id ", account: " account-id, "admin? " (is-user-admin-in-spice? client user-id account-id)) | |
(dotimes [n num-times] | |
; we are attempting to find a race condition here | |
; Even though assign permission & revoke tasks are in order & Datomic is consistent, delays in writing | |
; to Spice can cause inconsistent Spice state. | |
(let [!assign (task-add-user-to-account! conn client account-id user-id) | |
!revoke (task-remove-user-from-account! conn client account-id user-id)] | |
@!assign @!revoke)) | |
;(let [!assign (retry 100 task-add-user-to-account! conn client account-id user-id) | |
; !revoke (retry 100 task-remove-user-from-account! conn client account-id user-id)])) | |
;@!assign @!revoke)) | |
(spice/shutdown-channel! channel))) | |
(defn run-async-test! [& {:keys [num-threads checks-per-thread]}] | |
(reset! !checks 0) | |
(reset! !unwanted-access 0) | |
(reset! !unwanted-deny 0) | |
(reset! !expected-access 0) | |
(reset! !expected-denial 0) | |
;; write Spice schema here if needed | |
;(let [{:keys [channel client]} (spice-testing/make-test-client)] | |
; (def client client) | |
; (def channel channel)) | |
(d/delete-database datomic-uri) | |
(d/create-database datomic-uri) | |
(def *conn (d/connect datomic-uri)) | |
@(d/transact *conn | |
[{:db/ident :user/ident | |
:db/valueType :db.type/keyword | |
:db/cardinality :db.cardinality/one | |
:db/unique :db.unique/identity} | |
{:db/ident :account/ident | |
:db/valueType :db.type/keyword | |
:db/cardinality :db.cardinality/one | |
:db/unique :db.unique/identity} | |
{:db/ident :account/owner | |
:db/valueType :db.type/ref | |
:db/cardinality :db.cardinality/many | |
:db/index true} | |
{:db/ident :spice/version | |
:db/valueType :db.type/long | |
:db/cardinality :db.cardinality/one}]) | |
(let [{:as report :keys [tempids]} | |
@(d/transact *conn | |
[{:db/id "account", :account/ident :test/account, :spice/version 0} | |
{:db/id "user", :user/ident :test/user, :spice/version 0}]) | |
user-id (get tempids "user") | |
account-id (get tempids "account") | |
{:keys [channel client]} (spice-testing/make-test-client)] | |
(def *account-id account-id) | |
(def *user-id user-id) | |
(def *client client) | |
; clear out relationship if it exists from previous run. | |
(when (is-user-admin-in-spice? client user-id account-id) | |
(log/warn "Clearing :owner from a previous run.") | |
(authz/delete-relationship! client (->user user-id) :owner (->account account-id))) | |
(let [futures (for [x (range num-threads)] | |
(future (run-sync-task-handler *conn checks-per-thread)))] | |
(mapv deref futures) | |
(try | |
(Thread/sleep 500) ;; wait for any Spice futures to complete. | |
(let [db (d/db *conn) | |
;unwanted-access-ratio (str (/ @!unwanted-access (+ @!expected-access @!unwanted-access)))] | |
x {:checks-per-thread @!checks | |
:inconsistent? (not= | |
(is-user-admin-in-spice? client user-id account-id) | |
(is-owner-in-datomic? db user-id account-id)) | |
;:unwanted-access-ratio unwanted-access-ratio | |
:expected-access @!expected-access | |
:expected-denial @!expected-denial | |
:unwanted-access @!unwanted-access | |
:unwanted-deny @!unwanted-deny}] | |
; (spice/shutdown-channel! channel) ; skip channel shutdown for debugging. | |
x))))) | |
(comment | |
(let [{:keys [channel client]} (spice-testing/make-test-client)] | |
(authz/write-schema! client (spice-testing/load-test-schema)) | |
(spice/shutdown-channel! channel)) | |
(run-async-test! :num-threads 2 :checks-per-thread 5) | |
(run-sync-task-handler *conn 1) | |
(qry-account-owners (d/db *conn) *account-id) | |
@(d/transact *conn [[:db/add *account-id :account/owner *user-id]]) | |
(is-owner-in-datomic? (d/db *conn) *user-id *account-id) | |
(is-user-admin-in-spice? *client *user-id *account-id) | |
(sync-datomic->spice *client (d/db *conn) *account-id) | |
(loop [] | |
(let [x (run-async-test! :num-threads 3 :checks-per-thread 5)] | |
(if (:inconsistent? x) | |
(log/warn "Unwanted persistent access." x) | |
(recur))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment