in response to http://oobaloo.co.uk/multiplexing-work-reliably-and-efficiently-with-state-machines-and-core-dot-async .... so (start) will give you a channel to put initial states on.. or you can comment-in line 70
(ns async-state-progressor.progressor | |
(:require [clojure.core.async :refer [go-loop >! <! thread chan timeout onto-chan]])) | |
(defn log-state [msg current-state] | |
(locking *out* (println msg (:state current-state)))) | |
(defn download-report [{:keys [client date] :as state}] | |
;; snip | |
(assoc state :file "dummy-file")) | |
(defn upload-to-s3 [{:keys [file] :as state}] | |
;; snip | |
(assoc state :bucket "dummy-bucket" :key "dummy-key")) | |
(defrecord Transition [op next-state]) | |
(defn completed? [{:keys [state]}] | |
(= :completed state)) | |
(defn transition [{:keys [state] :as current-state}] | |
(log-state "transition from " current-state) | |
(condp = state | |
:downloadable (Transition. download-report :uploadable) | |
:uploadable (Transition. upload-to-s3 :completed))) | |
(defn step [current-state] | |
(let [{:keys [op next-state]} (transition current-state)] | |
(try (assoc (op current-state) :state next-state) | |
(catch Exception e | |
(assoc current-state :error e))))) | |
(defn progressor [states-ch completed-ch] | |
(go-loop [current-state (<! states-ch)] | |
(log-state "current-state: " current-state) | |
(if (completed? current-state) | |
(do (log-state "sending completed.. " current-state) (>! completed-ch current-state)) | |
(let [{:keys [error] :as new-state} (<! (thread (step current-state)))] | |
(log-state "new-state: " new-state) | |
(if error | |
(do (println "Error: " (:error new-state)) | |
(<! (timeout 5000)) ; delay the retry for 5s | |
(>! states-ch current-state)) | |
(and new-state (do (log-state "sending state.. " new-state) (>! states-ch new-state)))))) | |
(recur (<! states-ch)))) | |
(defn initial-state [date client] | |
{:date date | |
:client client | |
:state :downloadable}) | |
(def completed-ch (chan)) | |
(def initial-states (repeat 42 (initial-state "dummy-date" "dummy-client"))) | |
(defn start [] | |
(let [states-ch (chan 1024)] | |
(doseq [_ (range 10)] | |
(progressor states-ch completed-ch)) | |
;; snip- creates sequence of initial states | |
;; using initial-state fn. | |
#_(doseq [state initial-states] | |
(>!! states-ch state)) | |
(onto-chan states-ch (repeat 15 (initial-state "dummy-date" "dummy-client")) false) | |
;; snip- wait for all operations to complete | |
(go-loop [compl (<! completed-ch)] | |
(log-state "COMPLETE: " compl) | |
(recur (<! completed-ch))) | |
states-ch)) |
This comment has been minimized.
This comment has been minimized.
can you retry ? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
I am getting this error:
Assert failed: Can't recur here at frame at line 46.
This is with the following project.clj:
(defproject progressor "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.6.0"]
[org.clojure/core.async "0.1.346.0-17112a-alpha"]])
I haven't really looked into it any more.