Skip to content

Instantly share code, notes, and snippets.

@candera
Created March 7, 2014 19:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save candera/9418198 to your computer and use it in GitHub Desktop.
Save candera/9418198 to your computer and use it in GitHub Desktop.
transact-firehose
;; This has to be a separate function. The way we had it before, with
;; a loop inside a future, creates a closure that captures the head of
;; the sequence. Locals clearing here saves us from that.
(defn- enqueue-transactions
"Transact a (potentially infinite) sequence of transactions on `q`,
reporting status via `status`. Pause for `delay` (if non-nil)
between each transaction."
[conn [tx & more] q status delay]
(if (or (not tx) (:done @status))
(.put q (future :done))
(do
(try
(.put q (d/transact-async conn tx))
(catch Throwable t
(swap! status assoc-in [:producer :error] t)
(swap! status assoc :done true)
(-> @status :consumer :future future-cancel)
(throw t))
(finally
(swap! status update-in [:producer :tx-count] (fnil inc 0))
(swap! status assoc-in [:producer :tx] tx)
;; This next line lets us reach into a running
;; process and determine where we are, so we
;; can stop and pick up where we left off.
(swap! status assoc-in [:producer :txes-remaining] more)))
(when delay (Thread/sleep delay))
(recur conn more q status delay))))
(defn transact-firehose
"Given a database connection and a sequence of transactions, shove
them into the database as fast as possible. Stops immediately if any
of the transactions fail. Return a [future status] pair, where
future represents the thing doing the transacting, and can be
derefed to block on completion, and status is an atom containing
information about the ongoing operation. Set :done in the status map
to true to stop the computation.
As a simple way to throttle throughput, allows for a short pause of
`delay` milliseconds between each transaction. nil means run at full
speed."
([conn txes] (transact-firehose conn txes nil))
([conn txes delay]
(let [q (java.util.concurrent.LinkedBlockingQueue. 1000)
status (atom {:start-time (java.util.Date.)
:queue q})
producer (future (enqueue-transactions conn txes q status delay))
consumer (future
(try
(while (and (not (:done @status)) (not= :done (deref (.take q))))
(swap! status update-in [:consumer :tx-count] (fnil inc 0)))
(catch Throwable t
(swap! status assoc-in [:consumer :error] t)
(throw t))
(finally
(swap! status assoc :done true))))]
(swap! status assoc-in [:producer :future] producer)
(swap! status assoc-in [:consumer :future] consumer)
[consumer status])))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment