Created
April 4, 2017 00:17
-
-
Save favila/3bc6fae005228a3290d5509c088e2f11 to your computer and use it in GitHub Desktop.
Transact transactions in core.async channel `from-ch` against datomic connection `conn` with a pipeline depth of `n`.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn tx-pipeline-inorder | |
"Transact transactions in core.async channel `from-ch` against datomic connection `conn` with a pipeline depth of `n`. | |
Returns a map with keys `:stop!` and `:result`. `:stop!` is a no-arg function you can call to immediately | |
cease sending transactions (already sent transactions cannot be stopped). `:result` is a promise channel | |
returning a variant of the result of the tx-pipelining after from-ch is closed and drained, or | |
:stop! is called and all send transaction futures are deref-ed, or a transaction produces an exception. | |
The variant returned from the :result channel may be one of: | |
[:completed {:done total-number-of-transactions}] ;; `from-ch` closed and all transactions completed successfully. | |
[:stopped {:done total-number-of-transactions-completed}] ;; `:stop!` called, and all transactions sent before :stop! | |
;; completed successfully | |
[:error {:last-successful-tx-index long|nil ;; last tx that succeeded | |
:tx-index long ;; tx that this error is about | |
;; When on-tx-complete raises an exception, these two indexes may differ! | |
:exception Throwable ;; The exception raised | |
:tx [...] ;; The input transaction | |
:error-time :on-send | :on-deref | :on-tx-complete ;; At what phase of processing the error occured | |
:still-inflight a/chan | nil ;; A channel which returns a vector of any sent-but-not-deref-ed | |
;; transaction futures after the current one | |
;; Each item is `[:inflight index input-tx transaction-future]` | |
} | |
Optional function `on-tx-complete` is called after every successful transaction with arguments `i` | |
(a counter of txs taken from `from-ch`, starting at 0), `tx` (the input transaction), and `result` | |
(the tx-data result from the `d/transact` future). The default implementation prints a single dot to standard | |
out after every thousand transactions." | |
([conn n from-ch] | |
(tx-pipeline-inorder conn n from-ch | |
(fn [i tx result] | |
(when (mod i 1000) | |
(print ".") | |
(flush))))) | |
([conn n from-ch on-tx-complete] | |
(let [inflight (a/chan n (map (fn [i] | |
(if-some [tx (a/<!! from-ch)] | |
(try | |
[:inflight i tx (d/transact-async conn tx)] | |
(catch Throwable e | |
(reduced | |
[:error {:last-successful-tx-index (when-not (zero? i) (dec i)) | |
:tx-index i | |
:tx tx | |
:exception e | |
:error-time :on-send}]))) | |
(reduced [:done i]))))) | |
gather-pending (fn [inflight] | |
(a/reduce (fn [r msg] | |
(if (= (first msg) :inflight) | |
(conj r msg) | |
r)) | |
[] inflight)) | |
completed (a/chan)] | |
(a/thread | |
(loop [i 0] | |
(when (a/>!! inflight i) | |
(recur (inc i))))) | |
(a/thread | |
(loop [total 0] | |
(let [[kind :as msg] (a/<!! inflight)] | |
(case kind | |
:inflight (let [[_ i tx tx-future] msg | |
result (try @tx-future | |
(catch Throwable e | |
(a/>!! completed | |
[:error {:last-successful-tx-index (when-not (zero? i) (dec i)) | |
:tx-index i | |
:tx tx | |
:exception e | |
:error-time :on-deref | |
:still-inflight (gather-pending inflight)}]) | |
nil))] | |
(when result | |
(try | |
(on-tx-complete i tx result) | |
(catch Throwable e | |
[:error {:last-successful-tx-index i | |
:tx-index i | |
:tx tx | |
:result result | |
:exception e | |
:error-time :on-tx-complete | |
:still-inflight (gather-pending inflight)}])) | |
(recur (inc total)))) | |
:error (a/>!! completed (assoc-in msg [1 :still-inflight] (gather-pending inflight))) | |
:done (a/>!! completed [:completed {:done total}]) | |
nil (a/>!! completed [:stopped {:done total}]))))) | |
{:result completed | |
:stop! (fn [] (a/close! inflight))}))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment