Skip to content

Instantly share code, notes, and snippets.

@nivekuil
Last active January 9, 2024 06:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nivekuil/f10121ed05703f2c8377aa2de9557849 to your computer and use it in GitHub Desktop.
Save nivekuil/f10121ed05703f2c8377aa2de9557849 to your computer and use it in GitHub Desktop.
missionary flow managing effects from diffs
;; Essentially we want to make a tee, passing through diffs for rendering but also taking advantage of missionary's supervision
;; for mount/unmount effects. Thus the diffs have to have a concept of corresponding to an identity. 3rd try is the charm?
(def sender (y/observe (fn [!] (def send! !) #(prn "sender dead"))))
(defn boot [flow]((y/reduce {}nil flow) prn prn))
(def !state (atom 0))
(def flow (y/ap
(let [diff (y/?= sender)
res (y/?> (y/observe (fn[!]
(! (swap! !state + diff))
#(swap! !state - diff))))]
(prn res))))
(def ps (boot flow))
(send! 3)
(assert (= 3 @!state))
(ps)
(assert (= 0 @!state))
;;; incrementally merging into map by :id, like a user joining a room
(def !state (atom {}))
(defn <user [id]
(y/observe (fn[!](! nil)
(prn id "entered")
(fn [](prn id "left, value is" (get @!state id))
(swap! !state dissoc id)))))
(def flow (y/ap
(let [diff (y/?= sender)
id (:id diff)]
;; TODO how to cancel only the processing branch corresponding to id?
(when (:cancel? diff) (throw (ex-info "cancelled" {})))
(when-not (get @!state id) (y/?> (<user id)))
(swap! !state update id merge diff)
(prn id " updated "diff))))
(def ps (boot flow))
(send! {:id 1 :foo "foo"})
(send! {:id 1 :bar "bar"})
(assert (= {:id 1 :foo "foo" :bar "bar"} (@!state 1)))
(send! {:id 1 :cancel? true})
(assert (= nil (@!state 1)))
(ps)
;;; try to make it more generic and granular cancelling
(defn flow [kf vf on-mount on-unmount cancel? <in]
(let [s (atom {})]
(y/ap
(let [diff (y/?= <in)
id (kf diff)
v (vf diff)
cancel! (get @s id)]
(if cancel!
(y/amb
(when (cancel? diff) (cancel! v))
v)
(try (y/?> (y/ap
(let [cancelled (y/dfv)]
(y/?> (y/observe (fn [!]
(! v)
(swap! s assoc id cancelled)
(on-mount v)
(fn [] (swap! s dissoc id)
(on-unmount v)))))
(y/amb
v
(when (y/? cancelled)
(throw (missionary.Cancelled. "received cancel diff")))))))
(catch missionary.Cancelled _)))))))
(def !state (atom {}))
(def ps (boot (->> (flow :id identity
#(prn % "entered")
(fn [v](prn v "left")
(swap! !state dissoc (:id v)))
:cancel?
sender)
(y/reductions (fn [_ v]
(when (and v (not (:cancel? v)))
(prn "DIFF"v)
(swap! !state update (:id v) merge v)))))))
(send! {:id 1 :foo "foo"})
(send! {:id 1 :bar "bar"})
(send! {:id 2 :bar "bar"})
(assert (= {:id 1 :foo "foo" :bar "bar"} (@!state 1)))
(send! {:id 1 :cancel? true})
(assert (= nil (@!state 1)))
(assert (= {2 {:id 2 :bar "bar"}} @!state))
(ps)
(assert (= {} @!state))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment