Skip to content

Instantly share code, notes, and snippets.

@favila
Created April 4, 2017 00:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save favila/3bc6fae005228a3290d5509c088e2f11 to your computer and use it in GitHub Desktop.
Save favila/3bc6fae005228a3290d5509c088e2f11 to your computer and use it in GitHub Desktop.
Transact transactions in core.async channel `from-ch` against datomic connection `conn` with a pipeline depth of `n`.
(defn tx-pipeline-inorder
"Transact transactions in core.async channel `from-ch` against datomic connection `conn` with a pipeline depth of `n`.
Returns a map with keys `:stop!` and `:result`. `:stop!` is a no-arg function you can call to immediately
cease sending transactions (already sent transactions cannot be stopped). `:result` is a promise channel
returning a variant of the result of the tx-pipelining after from-ch is closed and drained, or
:stop! is called and all send transaction futures are deref-ed, or a transaction produces an exception.
The variant returned from the :result channel may be one of:
[:completed {:done total-number-of-transactions}] ;; `from-ch` closed and all transactions completed successfully.
[:stopped {:done total-number-of-transactions-completed}] ;; `:stop!` called, and all transactions sent before :stop!
;; completed successfully
[:error {:last-successful-tx-index long|nil ;; last tx that succeeded
:tx-index long ;; tx that this error is about
;; When on-tx-complete raises an exception, these two indexes may differ!
:exception Throwable ;; The exception raised
:tx [...] ;; The input transaction
:error-time :on-send | :on-deref | :on-tx-complete ;; At what phase of processing the error occured
:still-inflight a/chan | nil ;; A channel which returns a vector of any sent-but-not-deref-ed
;; transaction futures after the current one
;; Each item is `[:inflight index input-tx transaction-future]`
}
Optional function `on-tx-complete` is called after every successful transaction with arguments `i`
(a counter of txs taken from `from-ch`, starting at 0), `tx` (the input transaction), and `result`
(the tx-data result from the `d/transact` future). The default implementation prints a single dot to standard
out after every thousand transactions."
([conn n from-ch]
(tx-pipeline-inorder conn n from-ch
(fn [i tx result]
(when (mod i 1000)
(print ".")
(flush)))))
([conn n from-ch on-tx-complete]
(let [inflight (a/chan n (map (fn [i]
(if-some [tx (a/<!! from-ch)]
(try
[:inflight i tx (d/transact-async conn tx)]
(catch Throwable e
(reduced
[:error {:last-successful-tx-index (when-not (zero? i) (dec i))
:tx-index i
:tx tx
:exception e
:error-time :on-send}])))
(reduced [:done i])))))
gather-pending (fn [inflight]
(a/reduce (fn [r msg]
(if (= (first msg) :inflight)
(conj r msg)
r))
[] inflight))
completed (a/chan)]
(a/thread
(loop [i 0]
(when (a/>!! inflight i)
(recur (inc i)))))
(a/thread
(loop [total 0]
(let [[kind :as msg] (a/<!! inflight)]
(case kind
:inflight (let [[_ i tx tx-future] msg
result (try @tx-future
(catch Throwable e
(a/>!! completed
[:error {:last-successful-tx-index (when-not (zero? i) (dec i))
:tx-index i
:tx tx
:exception e
:error-time :on-deref
:still-inflight (gather-pending inflight)}])
nil))]
(when result
(try
(on-tx-complete i tx result)
(catch Throwable e
[:error {:last-successful-tx-index i
:tx-index i
:tx tx
:result result
:exception e
:error-time :on-tx-complete
:still-inflight (gather-pending inflight)}]))
(recur (inc total))))
:error (a/>!! completed (assoc-in msg [1 :still-inflight] (gather-pending inflight)))
:done (a/>!! completed [:completed {:done total}])
nil (a/>!! completed [:stopped {:done total}])))))
{:result completed
:stop! (fn [] (a/close! inflight))})))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment