Last active
September 3, 2023 09:25
-
-
Save clojj/f69094a851939103407a to your computer and use it in GitHub Desktop.
in response to http://oobaloo.co.uk/multiplexing-work-reliably-and-efficiently-with-state-machines-and-core-dot-async .... so (start) will give you a channel to put initial states on.. or you can comment-in line 70
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns async-state-progressor.progressor | |
(:require [clojure.core.async :refer [go-loop >! <! thread chan timeout onto-chan]])) | |
(defn log-state [msg current-state] | |
(locking *out* (println msg (:state current-state)))) | |
(defn download-report [{:keys [client date] :as state}] | |
;; snip | |
(assoc state :file "dummy-file")) | |
(defn upload-to-s3 [{:keys [file] :as state}] | |
;; snip | |
(assoc state :bucket "dummy-bucket" :key "dummy-key")) | |
(defrecord Transition [op next-state]) | |
(defn completed? [{:keys [state]}] | |
(= :completed state)) | |
(defn transition [{:keys [state] :as current-state}] | |
(log-state "transition from " current-state) | |
(condp = state | |
:downloadable (Transition. download-report :uploadable) | |
:uploadable (Transition. upload-to-s3 :completed))) | |
(defn step [current-state] | |
(let [{:keys [op next-state]} (transition current-state)] | |
(try (assoc (op current-state) :state next-state) | |
(catch Exception e | |
(assoc current-state :error e))))) | |
(defn progressor [states-ch completed-ch] | |
(go-loop [current-state (<! states-ch)] | |
(log-state "current-state: " current-state) | |
(if (completed? current-state) | |
(do (log-state "sending completed.. " current-state) (>! completed-ch current-state)) | |
(let [{:keys [error] :as new-state} (<! (thread (step current-state)))] | |
(log-state "new-state: " new-state) | |
(if error | |
(do (println "Error: " (:error new-state)) | |
(<! (timeout 5000)) ; delay the retry for 5s | |
(>! states-ch current-state)) | |
(and new-state (do (log-state "sending state.. " new-state) (>! states-ch new-state)))))) | |
(recur (<! states-ch)))) | |
(defn initial-state [date client] | |
{:date date | |
:client client | |
:state :downloadable}) | |
(def completed-ch (chan)) | |
(def initial-states (repeat 42 (initial-state "dummy-date" "dummy-client"))) | |
(defn start [] | |
(let [states-ch (chan 1024)] | |
(doseq [_ (range 10)] | |
(progressor states-ch completed-ch)) | |
;; snip- creates sequence of initial states | |
;; using initial-state fn. | |
#_(doseq [state initial-states] | |
(>!! states-ch state)) | |
(onto-chan states-ch (repeat 15 (initial-state "dummy-date" "dummy-client")) false) | |
;; snip- wait for all operations to complete | |
(go-loop [compl (<! completed-ch)] | |
(log-state "COMPLETE: " compl) | |
(recur (<! completed-ch))) | |
states-ch)) |
can you retry ?
sorry, messed up the comments... it should use core async's "onto-chan" instead of the (doseq ...) part (which also works, if you comment-in)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I am getting this error:
Assert failed: Can't recur here at frame at line 46.
This is with the following project.clj:
(defproject progressor "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.6.0"]
[org.clojure/core.async "0.1.346.0-17112a-alpha"]])
I haven't really looked into it any more.