Created
February 1, 2016 03:12
-
-
Save lbradstreet/616c6281e5757086dae4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defrecord ReadDatoms [task-map unroll db conn datoms-per-segment datoms top-chunk-index top-acked-chunk-index pending-chunk-indices] | |
PluginReader | |
(init [this] | |
(let [conn (safe-connect task-map) | |
db (safe-as-of task-map conn)] | |
(assoc this | |
:db db | |
:conn conn | |
:top-chunk-index (atom -1) | |
:top-acked-chunk-index (atom -1) | |
:pending-chunk-indices (atom #{}) | |
:unroll (partial unroll-datom db) | |
:datoms-per-segment (safe-datoms-per-segment task-map) | |
:datoms (atom (datoms-sequence db task-map))))) | |
(checkpoint [this] | |
;; TODO, if drained, we should put out the complete | |
{:chunk-index @top-acked-chunk-index :status :incomplete}) | |
(checkpoint-new! [this chunk-index] | |
(when chunk-index | |
(swap! top-chunk-index max chunk-index) | |
(swap! pending-chunk-indices conj chunk-index))) | |
(checkpoint-acked! [this chunk-index] | |
(swap! pending-chunk-indices disj chunk-index) | |
(let [new-top-acked (highest-acked-chunk @top-acked-chunk-index @top-chunk-index @pending-chunk-indices)] | |
(reset! top-acked-chunk-index new-top-acked))) | |
(recover! [this {:keys [chunk-index] :as content}] | |
(reset! top-acked-chunk-index chunk-index) | |
(reset! top-chunk-index chunk-index) | |
(swap! datoms (fn [s] (drop (* datoms-per-segment chunk-index) s)))) | |
(next-tick [this next-state] | |
(let [vs (take datoms-per-segment @datoms)] | |
(if-not (empty? vs) | |
(do (swap! datoms (fn [ds] (drop datoms-per-segment ds))) | |
(->Tick {:datoms (map unroll vs)} | |
(inc next-state))) | |
(->Tick :done nil))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment