Skip to content

Instantly share code, notes, and snippets.

@peter-wilkins
Created October 5, 2023 15:23
Show Gist options
  • Save peter-wilkins/98a2fe47aa5024b85cda7dbe5ace07f0 to your computer and use it in GitHub Desktop.
Save peter-wilkins/98a2fe47aa5024b85cda7dbe5ace07f0 to your computer and use it in GitHub Desktop.
code for blog post: Durable Executions with Coffee Grinders
(ns true-fdat
(:require [cognitect.transit :as transit]
[dvlopt.fdat :as fdat :refer [?]]
[dvlopt.fdat.plugins.transit :as fdat.plugins.transit])
#?(:clj (:import [clojure.lang PersistentQueue])))
#_{:deps {dvlopt/fdat.plugins.transit {:mvn/version "0.0.0-beta3"}
com.cognitect/transit-clj {:mvn/version "1.0.333"}}}
; code for blog post: Durable Executions with Coffee Grinders
(def queue
#?(:clj PersistentQueue/EMPTY
:cljs (.-EMPTY PersistentQueue)))
(def readers
{"queue" (transit/read-handler
(fn [q] (into queue q)))})
(def writers
{PersistentQueue (transit/write-handler "queue" (fn [q] (into [] q)))})
;; Serialization in Clojure, for example:
(import '(java.io ByteArrayInputStream
ByteArrayOutputStream))
(defn serialize
[x]
#?(:clj (let [out (ByteArrayOutputStream. 512)]
(transit/write (transit/writer out
:json
(update (fdat.plugins.transit/writer-options) :handlers merge writers))
x)
out)
:cljs (transit/write (transit/writer :json
(fdat.plugins.transit/writer-options))
x)))
(defn deserialize
[x]
#?(:clj (transit/read (transit/reader (ByteArrayInputStream. (.toByteArray x))
:json
{:handlers (merge (fdat.plugins.transit/handler-in) readers)}))
:cljs (transit/read (transit/reader :json
{:handlers (fdat.plugins.transit/handler-in)})
x)))
(comment
(-> queue
(conj 1 2 3 4)
serialize
deserialize
clojure.pprint/pprint)
)
(defn fn->name [f] (-> f meta :dvlopt.fdat/key name))
(defn pp-fns [ctx]
(update ctx :queue #(apply conj queue (map fn->name (seq %)))))
;;;;;;;;;;;;; v1 basics
(defn inc-counter [ctx] (update ctx :counter inc))
(defn run [{:keys [queue] :as ctx}]
(if-let [[step & next-queue] (seq queue)]
(recur (step (assoc ctx :queue next-queue)))
ctx))
(run {:counter 0
:queue [inc-counter inc-counter inc-counter]})
;;=> {:counter 3 :queue nil}
;;;;;;;;;;;;;;;;; v2 storing each ctx in atom after each step
(def history (atom []))
(? (defn inc-counter [ctx] (update ctx :counter inc)))
(fdat/register [inc-counter])
(defn run [{:keys [queue] :as ctx}]
(if-let [step (peek queue)]
(let [result (-> ctx step (update :queue pop))]
(swap! history conj result)
(recur result))
ctx))
(run {:counter 0
:queue (conj PersistentQueue/EMPTY inc-counter inc-counter inc-counter)})
(->> @history
(map pp-fns) ; just making functions more readable in output
clojure.pprint/pprint) ; makes queues look cool
;; =>
;; ({:counter 1, :queue <-("inc-counter" "inc-counter")-<}
;; {:counter 2, :queue <-("inc-counter")-<}
;; {:counter 3, :queue <-()-<}
;;;;;;;;;;;;;;;;; v3 if a step throws it is still on queue in stored version
(def latest-version (atom nil))
(? (defn bang [ctx] (assoc ctx :error true)))
(fdat/register [bang])
(defn run [{:keys [queue] :as ctx}]
(if-let [step (peek queue)]
(let [{:keys [error] :as result} (-> ctx step (update :queue pop))]
(if error
(println "Alert!")
(do
(reset! latest-version result)
(recur result))))
(dissoc ctx :queue)))
(run {:counter 0
:queue (conj PersistentQueue/EMPTY inc-counter inc-counter bang inc-counter)})
(->> @latest-version
pp-fns
clojure.pprint/pprint)
;; => {:counter 2, :queue <-("bang" "inc-counter")-<}
(run @latest-version)
;; => {:counter 3, :error false} NOPE!
;;;;;;;;;;;;;;;; v4 serialization fixes it?
(defn run [{:keys [queue] :as ctx}]
(if-let [step (peek queue)]
(let [{:keys [error] :as result} (-> ctx step (update :queue pop))]
(if error
(println "Alert!")
(do
(reset! latest-version (serialize result))
(recur result))))
(dissoc ctx :queue)))
(run {:counter 0
:queue (conj PersistentQueue/EMPTY inc-counter inc-counter bang inc-counter)})
(->> @latest-version
deserialize
pp-fns
clojure.pprint/pprint)
;; => {:counter 2, :queue <-("bang" "inc-counter")-<}
; once bang is fixed we can resume
(? (defn bang [ctx] (assoc ctx :error false)))
(fdat/register [bang])
(run (deserialize @latest-version))
; => {:counter 3, :error false} Yay!
;;;;;;;;;;;;;;;;; Or skip the broken step
(-> @latest-version
deserialize
(update :queue pop)
run)
;; => {:counter 3}
(comment
(-> (run {:counter 0
:queue (conj queue inc-counter inc-counter bang inc-counter)})
clojure.pprint/pprint)
(-> @db
deserialize
run
clojure.pprint/pprint)
(map meta (seq (:queue (deserialize @db))))
@db
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment