Created
October 5, 2023 15:23
-
-
Save peter-wilkins/98a2fe47aa5024b85cda7dbe5ace07f0 to your computer and use it in GitHub Desktop.
code for blog post: Durable Executions with Coffee Grinders
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns true-fdat | |
(:require [cognitect.transit :as transit] | |
[dvlopt.fdat :as fdat :refer [?]] | |
[dvlopt.fdat.plugins.transit :as fdat.plugins.transit]) | |
#?(:clj (:import [clojure.lang PersistentQueue]))) | |
#_{:deps {dvlopt/fdat.plugins.transit {:mvn/version "0.0.0-beta3"} | |
com.cognitect/transit-clj {:mvn/version "1.0.333"}}} | |
; code for blog post: Durable Executions with Coffee Grinders | |
(def queue | |
#?(:clj PersistentQueue/EMPTY | |
:cljs (.-EMPTY PersistentQueue))) | |
(def readers | |
{"queue" (transit/read-handler | |
(fn [q] (into queue q)))}) | |
(def writers | |
{PersistentQueue (transit/write-handler "queue" (fn [q] (into [] q)))}) | |
;; Serialization in Clojure, for example: | |
(import '(java.io ByteArrayInputStream | |
ByteArrayOutputStream)) | |
(defn serialize | |
[x] | |
#?(:clj (let [out (ByteArrayOutputStream. 512)] | |
(transit/write (transit/writer out | |
:json | |
(update (fdat.plugins.transit/writer-options) :handlers merge writers)) | |
x) | |
out) | |
:cljs (transit/write (transit/writer :json | |
(fdat.plugins.transit/writer-options)) | |
x))) | |
(defn deserialize | |
[x] | |
#?(:clj (transit/read (transit/reader (ByteArrayInputStream. (.toByteArray x)) | |
:json | |
{:handlers (merge (fdat.plugins.transit/handler-in) readers)})) | |
:cljs (transit/read (transit/reader :json | |
{:handlers (fdat.plugins.transit/handler-in)}) | |
x))) | |
(comment | |
(-> queue | |
(conj 1 2 3 4) | |
serialize | |
deserialize | |
clojure.pprint/pprint) | |
) | |
(defn fn->name [f] (-> f meta :dvlopt.fdat/key name)) | |
(defn pp-fns [ctx] | |
(update ctx :queue #(apply conj queue (map fn->name (seq %))))) | |
;;;;;;;;;;;;; v1 basics | |
(defn inc-counter [ctx] (update ctx :counter inc)) | |
(defn run [{:keys [queue] :as ctx}] | |
(if-let [[step & next-queue] (seq queue)] | |
(recur (step (assoc ctx :queue next-queue))) | |
ctx)) | |
(run {:counter 0 | |
:queue [inc-counter inc-counter inc-counter]}) | |
;;=> {:counter 3 :queue nil} | |
;;;;;;;;;;;;;;;;; v2 storing each ctx in atom after each step | |
(def history (atom [])) | |
(? (defn inc-counter [ctx] (update ctx :counter inc))) | |
(fdat/register [inc-counter]) | |
(defn run [{:keys [queue] :as ctx}] | |
(if-let [step (peek queue)] | |
(let [result (-> ctx step (update :queue pop))] | |
(swap! history conj result) | |
(recur result)) | |
ctx)) | |
(run {:counter 0 | |
:queue (conj PersistentQueue/EMPTY inc-counter inc-counter inc-counter)}) | |
(->> @history | |
(map pp-fns) ; just making functions more readable in output | |
clojure.pprint/pprint) ; makes queues look cool | |
;; => | |
;; ({:counter 1, :queue <-("inc-counter" "inc-counter")-<} | |
;; {:counter 2, :queue <-("inc-counter")-<} | |
;; {:counter 3, :queue <-()-<} | |
;;;;;;;;;;;;;;;;; v3 if a step throws it is still on queue in stored version | |
(def latest-version (atom nil)) | |
(? (defn bang [ctx] (assoc ctx :error true))) | |
(fdat/register [bang]) | |
(defn run [{:keys [queue] :as ctx}] | |
(if-let [step (peek queue)] | |
(let [{:keys [error] :as result} (-> ctx step (update :queue pop))] | |
(if error | |
(println "Alert!") | |
(do | |
(reset! latest-version result) | |
(recur result)))) | |
(dissoc ctx :queue))) | |
(run {:counter 0 | |
:queue (conj PersistentQueue/EMPTY inc-counter inc-counter bang inc-counter)}) | |
(->> @latest-version | |
pp-fns | |
clojure.pprint/pprint) | |
;; => {:counter 2, :queue <-("bang" "inc-counter")-<} | |
(run @latest-version) | |
;; => {:counter 3, :error false} NOPE! | |
;;;;;;;;;;;;;;;; v4 serialization fixes it? | |
(defn run [{:keys [queue] :as ctx}] | |
(if-let [step (peek queue)] | |
(let [{:keys [error] :as result} (-> ctx step (update :queue pop))] | |
(if error | |
(println "Alert!") | |
(do | |
(reset! latest-version (serialize result)) | |
(recur result)))) | |
(dissoc ctx :queue))) | |
(run {:counter 0 | |
:queue (conj PersistentQueue/EMPTY inc-counter inc-counter bang inc-counter)}) | |
(->> @latest-version | |
deserialize | |
pp-fns | |
clojure.pprint/pprint) | |
;; => {:counter 2, :queue <-("bang" "inc-counter")-<} | |
; once bang is fixed we can resume | |
(? (defn bang [ctx] (assoc ctx :error false))) | |
(fdat/register [bang]) | |
(run (deserialize @latest-version)) | |
; => {:counter 3, :error false} Yay! | |
;;;;;;;;;;;;;;;;; Or skip the broken step | |
(-> @latest-version | |
deserialize | |
(update :queue pop) | |
run) | |
;; => {:counter 3} | |
(comment | |
(-> (run {:counter 0 | |
:queue (conj queue inc-counter inc-counter bang inc-counter)}) | |
clojure.pprint/pprint) | |
(-> @db | |
deserialize | |
run | |
clojure.pprint/pprint) | |
(map meta (seq (:queue (deserialize @db)))) | |
@db | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment