in response to .... so (start) will give you a channel to put initial states on.. or you can comment-in line 70
(ns async-state-progressor.progressor
(:require [clojure.core.async :refer [go-loop >! <! thread chan timeout onto-chan]]))
(defn log-state [msg current-state]
(locking *out* (println msg (:state current-state))))
(defn download-report [{:keys [client date] :as state}]
;; snip
(assoc state :file "dummy-file"))
(defn upload-to-s3 [{:keys [file] :as state}]
;; snip
(assoc state :bucket "dummy-bucket" :key "dummy-key"))
(defrecord Transition [op next-state])
(defn completed? [{:keys [state]}]
(= :completed state))
(defn transition [{:keys [state] :as current-state}]
(log-state "transition from " current-state)
(condp = state
:downloadable (Transition. download-report :uploadable)
:uploadable (Transition. upload-to-s3 :completed)))
(defn step [current-state]
(let [{:keys [op next-state]} (transition current-state)]
(try (assoc (op current-state) :state next-state)
(catch Exception e
(assoc current-state :error e)))))
(defn progressor [states-ch completed-ch]
(go-loop [current-state (<! states-ch)]
(log-state "current-state: " current-state)
(if (completed? current-state)
(do (log-state "sending completed.. " current-state) (>! completed-ch current-state))
(let [{:keys [error] :as new-state} (<! (thread (step current-state)))]
(log-state "new-state: " new-state)
(if error
(do (println "Error: " (:error new-state))
(<! (timeout 5000)) ; delay the retry for 5s
(>! states-ch current-state))
(and new-state (do (log-state "sending state.. " new-state) (>! states-ch new-state))))))
(recur (<! states-ch))))
(defn initial-state [date client]
{:date date
:client client
:state :downloadable})
(def completed-ch (chan))
(def initial-states (repeat 42 (initial-state "dummy-date" "dummy-client")))
(defn start []
(let [states-ch (chan 1024)]
(doseq [_ (range 10)]
(progressor states-ch completed-ch))
;; snip- creates sequence of initial states
;; using initial-state fn.
#_(doseq [state initial-states]
(>!! states-ch state))
(onto-chan states-ch (repeat 15 (initial-state "dummy-date" "dummy-client")) false)
;; snip- wait for all operations to complete
(go-loop [compl (<! completed-ch)]
(log-state "COMPLETE: " compl)
(recur (<! completed-ch)))

@thomas-shares thomas-shares commented Apr 2, 2015

I am getting this error:

Assert failed: Can't recur here at frame at line 46.

This is with the following project.clj:

(defproject progressor "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url ""
:license {:name "Eclipse Public License"
:url ""}
:dependencies [[org.clojure/clojure "1.6.0"]
[org.clojure/core.async "0.1.346.0-17112a-alpha"]])

I haven't really looked into it any more.


@clojj clojj commented Apr 8, 2015

can you retry ?
sorry, messed up the comments... it should use core async's "onto-chan" instead of the (doseq ...) part (which also works, if you comment-in)

