Skip to content

Instantly share code, notes, and snippets.

@lantiga
Created January 12, 2014 00:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lantiga/53d163fb00db2a754099 to your computer and use it in GitHub Desktop.
Save lantiga/53d163fb00db2a754099 to your computer and use it in GitHub Desktop.
asyncatom
(ns asyncatom.core
(:require [clojure.core.async :refer [chan go <! >! <!!]])
(:refer-clojure :exclude [atom swap! reset! compare-and-set! deref]))
(defn atom [v]
(let [c (chan)]
(go
(loop [v v]
(let [[cmd oc & args] (<! c)
v (condp = cmd
:get (do (>! oc v) v)
:set (let [[nv] args] (>! oc nv) nv)
:cas (let [[ov nv] args]
(if (= ov v)
(do (>! oc [:acc nv]) nv)
(do (>! oc [:rej v]) v)))
:get-in (let [[ks] args] (>! oc (get-in v ks)) v)
:set-in (let [[nkv ks] args
nv (assoc-in v ks nkv)]
(>! oc nv)
nv)
:cas-in (let [[okv nkv ks] args]
(if (= okv (get-in v ks))
(let [nv (assoc-in v ks nkv)] (>! oc [:acc nv]) nv)
(do (>! oc [:rej v]) v)))
:get-m (let [ksv args] (>! oc (map #(get-in v %) ksv)) v)
:set-m (let [[nkvm] args
nv (reduce (fn [[ks nkv]] (assoc-in v ks nkv)) v nkvm)]
(>! oc nv)
nv)
:cas-m (let [[okvm nkvm] args]
(if (every? (map (fn [[ks okv]] (= okv (get-in v ks))) okvm))
(let [nv (reduce (fn [[ks nkv]] (assoc-in v ks nkv)) v nkvm)] (>! oc [:acc nv]) nv)
(do (>! oc [:rej v]) v))))]
(recur v))))
c))
;; TODO:
;; - make add-watch a map indexed by ks, where watches can be triggered upon updates of sub-maps
;; - write ant colony demo as example
;;
;; From the ant colony demo it appears that one of the key ingredients is that the agent function is
;; retried if the transaction fails.
;; Explore new -m functions. The ant colony could be written with agents and -m functions, since we
;; can CAS based on multiple positions on the grid and if any changes we retry the swap.
;; We achieve the minimal amount of swaps.
(defn swap! [c f]
(<!!
(go
(let [oc (chan)]
(loop [[res out] [nil nil]]
(if (= :acc res)
out
(do
(>! c [:get oc])
(let [v (<! oc)]
(>! c [:cas oc v (f v)])
(recur (<! oc))))))))))
(defn reset! [c v]
(<!!
(go
(let [oc (chan)]
(>! c [:set oc v])
(<! oc)))))
(defn compare-and-set! [c ov nv]
(<!!
(go
(let [oc (chan)]
(>! c [:cas oc ov nv])
(<! oc)))))
(defn deref [c]
(<!!
(go
(let [oc (chan)]
(>! c [:get oc])
(<! oc)))))
(defn swap-in! [c f ks]
(<!!
(go
(let [oc (chan)]
(loop [[res out] [nil nil]]
(if (= :acc res)
out
(do
(>! c [:get-in oc ks])
(let [v (<! oc)]
(>! c [:cas-in oc v (f v) ks])
(recur (<! oc))))))))))
(defn reset-in! [c v ks]
(<!!
(go
(let [oc (chan)]
(>! c [:set-in oc v ks])
(<! oc)))))
(defn compare-and-set-in! [c ov nv ks]
(<!!
(go
(let [oc (chan)]
(>! c [:cas-in oc ov nv ks])
(<! oc)))))
(defn deref-in [c ks]
(<!!
(go
(let [oc (chan)]
(>! c [:get-in oc ks])
(<! oc)))))
(defn swap-m! [c ksfm]
(<!!
(go
(let [oc (chan)]
(loop [[res out] [nil nil]]
(if (= :acc res)
out
(let [ks (keys ksfm)]
(>! c [:get-m oc ks])
(let [v (<! oc)
okvm (zipmap ks v)
nkvm (zipmap ks (map (fn [[ks f]] f v) ksfm))]
(>! c [:cas-m oc okvm nkvm])
(recur (<! oc))))))))))
(defn reset-m! [c nkvm]
(<!!
(go
(let [oc (chan)]
(>! c [:set-m oc nkvm])
(<! oc)))))
(defn compare-and-set-m! [c okvm nkvm]
(<!!
(go
(let [oc (chan)]
(>! c [:cas-in oc okvm nkvm])
(<! oc)))))
(defn deref-m [c ksv]
(<!!
(go
(let [oc (chan)]
(>! c [:get-in oc ksv])
(<! oc)))))
(comment
(defn async-atom-test [n]
(let [a (async-atom 0)
increment (fn [x] (do (Thread/sleep 1) (inc x)))
f (partial async-swap! a increment)]
(doall (repeatedly n f))
(async-deref a)))
(defn async-atom-parallel-test [n]
(let [a (async-atom 0)
increment (fn [x] (do (Thread/sleep 1) (inc x)))
f (partial async-swap! a increment)]
(doall (apply pcalls (repeat n f)))
(async-deref a)))
(defn atom-test [n]
(let [a (atom 0)
increment (fn [x] (do (Thread/sleep 1) (inc x)))
f (partial swap! a increment)]
(doall (repeatedly n f))
(deref a)))
(defn atom-parallel-test [n]
(let [a (atom 0)
increment (fn [x] (do (Thread/sleep 1) (inc x)))
f (partial swap! a increment)]
(doall (apply pcalls (repeat n f)))
(deref a)))
(defn async-atom-in-test []
(let [a (async-atom {:a 0 :b {:c 0}})]
(async-swap-in! a inc [:b :c])))
(defn async-atom-parallel-in-test [n]
(let [a (async-atom {:a 0 :b {:c 0}})
increment (fn [x] (do (Thread/sleep 1) (inc x)))
f1 (partial async-swap-in! a increment [:a])
f2 (partial async-swap-in! a increment [:b :c])]
;; TODO: how to check that f1 and f2 are proceeding independently?
(doall (apply pcalls (interleave (repeat n f1) (repeat n f2))))
(async-deref a))))
(comment
(let [at (atom 0)
ag (agent 0)]
(send ag (fn [v] (swap! at inc) (inc v)))
(await ag)
[@at @ag])
(let [a (async-atom [[0 0 0] [1 1 1] [2 2 2]])]
(async-swap-in! a inc [0 1])))
(comment
(time (atom-test 1000))
(time (async-atom-test 1000))
(time (atom-parallel-test 1000))
(time (async-atom-parallel-test 1000))
(async-atom-in-test)
(async-atom-parallel-in-test 100)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment