Skip to content

Instantly share code, notes, and snippets.

@lbradstreet
Created February 1, 2016 03:12
Show Gist options
  • Save lbradstreet/616c6281e5757086dae4 to your computer and use it in GitHub Desktop.
Save lbradstreet/616c6281e5757086dae4 to your computer and use it in GitHub Desktop.
(defrecord ReadDatoms [task-map unroll db conn datoms-per-segment datoms top-chunk-index top-acked-chunk-index pending-chunk-indices]
PluginReader
(init [this]
(let [conn (safe-connect task-map)
db (safe-as-of task-map conn)]
(assoc this
:db db
:conn conn
:top-chunk-index (atom -1)
:top-acked-chunk-index (atom -1)
:pending-chunk-indices (atom #{})
:unroll (partial unroll-datom db)
:datoms-per-segment (safe-datoms-per-segment task-map)
:datoms (atom (datoms-sequence db task-map)))))
(checkpoint [this]
;; TODO, if drained, we should put out the complete
{:chunk-index @top-acked-chunk-index :status :incomplete})
(checkpoint-new! [this chunk-index]
(when chunk-index
(swap! top-chunk-index max chunk-index)
(swap! pending-chunk-indices conj chunk-index)))
(checkpoint-acked! [this chunk-index]
(swap! pending-chunk-indices disj chunk-index)
(let [new-top-acked (highest-acked-chunk @top-acked-chunk-index @top-chunk-index @pending-chunk-indices)]
(reset! top-acked-chunk-index new-top-acked)))
(recover! [this {:keys [chunk-index] :as content}]
(reset! top-acked-chunk-index chunk-index)
(reset! top-chunk-index chunk-index)
(swap! datoms (fn [s] (drop (* datoms-per-segment chunk-index) s))))
(next-tick [this next-state]
(let [vs (take datoms-per-segment @datoms)]
(if-not (empty? vs)
(do (swap! datoms (fn [ds] (drop datoms-per-segment ds)))
(->Tick {:datoms (map unroll vs)}
(inc next-state)))
(->Tick :done nil)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment