Skip to content

Instantly share code, notes, and snippets.

@robert-stuttaford
Created August 14, 2015 06:22
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robert-stuttaford/3bd5240c988f05092504 to your computer and use it in GitHub Desktop.
Save robert-stuttaford/3bd5240c988f05092504 to your computer and use it in GitHub Desktop.
(ns onyx-tx-report-queue
(:require [clojure.core.async :refer [>!! alts!! chan close! put! thread]]
[clojure.tools.logging :as log]
;; :all so clj-refactor doesn't remove it:
[onyx.plugin.core-async :refer :all]
[datomic.api :as d])
(:import [java.util.concurrent TimeUnit]))
(defn prepare-datom [db [e a v tx added]]
[e (d/ident db a) v tx added])
(defn inject-tx-report-chan [event {:keys [db-uri capacity]}]
(let [conn (d/connect db-uri)
tx-report-ch (chan capacity)
control-ch (chan)]
(thread
(log/info "Started tx-report thread for:" db-uri)
(try (let [queue (d/tx-report-queue conn)]
(while (let [value (alts!! [control-ch] :default ::continue)]
(log/info "tx-report-thread signal value:" value)
(= [::continue :default] value))
(when-let [tx-result (.poll queue 5 TimeUnit/SECONDS)]
(log/debug tx-result)
(let [db (:db-after tx-result)]
;; this will print out a summary of the attrs
;; present in the tx. useful to see the shape of
;; the data without having to read the whole datoms vec.
(log/info "Submitting" (count (:tx-data tx-result)) "datoms for:" db-uri
"\n\t" (->> (:tx-data tx-result)
(map :a)
distinct
(map (partial d/attribute db))
(map :ident)
sort
vec))
(doseq [datom (:tx-data tx-result)
:let [segment {:datom (prepare-datom db datom)
:db-uri db-uri
:t (t-for-db db-uri)}]]
(log/debug "Submitting datom from tx: " (:datom segment))
(>!! tx-report-ch segment)))))
(log/info "Stopped tx-report thread for:" db-uri))
(catch Exception e
(log/debug "TX-REPORT-TAKE exception: " e)
(throw e))))
{:core.async/chan tx-report-ch
:control-chan control-ch}))
;; we spiked this part but have disabled it (by commenting out the
;; after-task-stop item below) because it is called after the first tx
;; is processed rather than when we kill the job. but you get what
;; we're trying to do, i hope :-)
(defn dispose-tx-report-chan [event {:keys [db-uri]}]
(d/remove-tx-report-queue (d/connect db-uri))
(log/info "Stopping tx-report thread for" db-uri "...")
(put! (:control-chan event) ::stop)
(close! (:control-chan event))
(close! (:core.async/chan event))
(dissoc event :control-chan :core.async/chan))
(def tx-report-calls
{:lifecycle/before-task-start inject-tx-report-chan
:lifecycle/after-task-stop dispose-tx-report-chan})
(defn tx-report-lifecycles [db-uri capacity]
[{:lifecycle/task :tx-report
:db-uri db-uri
:capacity capacity
:lifecycle/calls :onyx-tx-report-queue/tx-report-calls}
{:lifecycle/task :tx-report
:lifecycle/calls :onyx.plugin.core-async/reader-calls}])
(defn lazy-worker-bee [entries segment]
segment)
(defn catalog [db-uri batch-size]
[{:onyx/name :tx-report
:onyx/ident :core.async/read-from-chan
:onyx/type :input
:onyx/medium :core.async
:onyx/batch-size batch-size
:onyx/max-peers 1
:onyx/doc "Reads segments from a core.async channel"}
{:onyx/name :lazy-worker-bee
:onyx/fn :onyx-tx-report-queue/lazy-worker-bee
:onyx/type :function
:onyx/batch-size batch-size}])
(comment
(let [db-uri "datomic:free://your-db"
input-chan-capacity 100
batch-size 1]
(onyx/submit-job
peer-config
{:catalog (catalog db-uri batch-size)
:workflow [:tx-report :lazy-worker-bee]
:lifecycles (tx-report-lifecycles db-uri input-chan-capacity)
:flow-conditions []
:task-scheduler :onyx.task-scheduler/balanced}))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment