Skip to content

Instantly share code, notes, and snippets.

@zentrope
Last active August 29, 2015 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zentrope/edadf63ec60c1ffc5fb0 to your computer and use it in GitHub Desktop.
Save zentrope/edadf63ec60c1ffc5fb0 to your computer and use it in GitHub Desktop.
file-atom: core.async, promises, atomic-reference
(ns afile.gambit-3
(:refer-clojure :exclude [deref])
(:require
[clojure.edn :as edn]
[clojure.pprint :refer [pprint]]
[clojure.core.async :refer [go <! put! chan]]
[clojure.test :refer [deftest is run-tests]]))
(defn- load-file!
[file init-value]
(if (.exists file)
(-> file slurp edn/read-string)
init-value))
(defn write!
[f x]
(try
(spit f (with-out-str (pprint x)))
(catch Throwable t
(println "error" t))))
(defn go-write!
[file ch]
(go (loop []
(when-let [[x p] (<! ch)]
(write! file x)
(when p (deliver p x))
(recur)))))
(defn retry!
[fatom state f]
(loop []
(when-not (compare-and-set! fatom @fatom (f @fatom))
(recur)))
fatom)
(deftype FileAtom [state out-ch sync?]
clojure.lang.IDeref
(deref [this]
(.get state))
clojure.lang.IBlockingDeref
(deref [this time timeout-value]
(.get state))
clojure.lang.IAtom
(reset [this value]
(let [p (when sync? (promise))]
(locking state
(.set state value)
(put! out-ch [value p])
(when p (clojure.core/deref p)))
this))
(compareAndSet [this old-val new-val]
(locking state ;; Otherwise, same prob as add-watch on atoms
(if-let [ret (.compareAndSet state old-val new-val)]
(let [p (when sync? (promise))]
(put! out-ch [(.get state) p])
(when p (clojure.core/deref p))
true)
false)))
(swap [this f]
(retry! this state f))
(swap [this f arg]
(retry! this state #(apply f [% arg])))
(swap [this f arg1 arg2]
(retry! this state #(apply f [% arg1 arg2])))
(swap [this f x y args]
(retry! this state #(apply f (list* % x y args)))))
(defn fatom
([fname]
(fatom fname nil))
([fname init-value]
(fatom fname init-value true))
([fname init-value sync?]
(let [file (java.io.File. fname)
value (load-file! file init-value)
out-ch (chan 10)
cache (java.util.concurrent.atomic.AtomicReference. value)]
(go-write! file out-ch)
(FileAtom. cache out-ch sync?))))
;;-----------------------------------------------------------------------------
(deftest file-backed-atom
(.delete (java.io.File. "test.edn"))
(let [data (fatom "test.edn" {:x 10} true)
_ (reset! data {:x 0})
num-threads 300
runs (for [x (range num-threads)]
(future (swap! data #(assoc % :x (inc (:x %))))))]
(doseq [r runs] @r)
(is (= {:x num-threads} @data))
(let [file-contents (-> "test.edn" slurp edn/read-string)]
(is (= {:x num-threads} file-contents)))))
(deftest file-backed-atom-async
(.delete (java.io.File. "test.edn"))
(let [data (fatom "test.edn" {:x 10} false)
_ (reset! data {:x 0})
num-threads 300
runs (for [x (range num-threads)]
(future (swap! data #(assoc % :x (inc (:x %))))))]
(doseq [r runs] @r)
(is (= {:x num-threads} @data))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment