Skip to content

Instantly share code, notes, and snippets.

@gardnervickers
Created March 8, 2016 13:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gardnervickers/0f22b691e1d98fe9e2b8 to your computer and use it in GitHub Desktop.
Save gardnervickers/0f22b691e1d98fe9e2b8 to your computer and use it in GitHub Desktop.
(defn build-job [db-uri log-end-tx batch-size batch-timeout]
(let [batch-settings {:onyx/batch-size batch-size :onyx/batch-timeout batch-timeout}
base-job (merge {:workflow [[:read-log :persist]]
:catalog []
:lifecycles []
:windows []
:triggers []
:flow-conditions []
:task-scheduler :onyx.task-scheduler/balanced})]
(-> base-job
(add-task (read-datomic-log :read-log
(merge {:datomic/uri db-uri
:checkpoint/key "checkpoint"
:checkpoint/force-reset? false
:onyx/max-peers 1
:datomic/log-end-tx log-end-tx}
batch-settings)))
(add-task (core-async/output-task :persist batch-settings)))))
(defn ensure-datomic!
([db-uri data]
(d/create-database db-uri)
@(d/transact
(d/connect db-uri)
data)))
(def schema
[{:db/id #db/id [:db.part/db]
:db/ident :com.mdrogalis/people
:db.install/_partition :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :user/name
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}])
(def people
[{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Mike"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Dorrene"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Benti"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Derek"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Kristen"}])
(def people2
[{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Mike2"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Dorrene2"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Benti2"}])
(def people3
[{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Mike3"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Dorrene3"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Benti3"}])
(def people4
[{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Mike4"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Dorrene4"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Benti4"}])
(deftest ^:ci datomic-input-log-test
(let [db-uri (str "datomic:free://localhost:4334/" (java.util.UUID/randomUUID))
{:keys [env-config peer-config]}
(read-config (clojure.java.io/resource "config.edn") {:profile :test})]
(try
(testing "That we can read the initial transaction log"
(with-test-env [test-env [4 env-config peer-config]]
(let [job (build-job db-uri 1002 10 1000)
{:keys [persist]} (core-async/get-core-async-channels job)]
(mapv (partial ensure-datomic! db-uri) [schema people])
(onyx.api/submit-job peer-config job)
(ensure-datomic! db-uri people2)
(is (=
[{:data '(;[13194139534312 50 #inst "2015-08-19T13:27:59.237-00:00" 13194139534312 true]
[63 10 :com.mdrogalis/people 13194139534312 true]
[0 11 63 13194139534312 true]
[64 10 :user/name 13194139534312 true]
[64 40 23 13194139534312 true]
[64 41 35 13194139534312 true]
[0 13 64 13194139534312 true])
:t 1000}
{:data '(;[13194139534313 50 #inst "2015-08-19T13:27:59.256-00:00" 13194139534313 true]
[277076930200554 64 "Mike" 13194139534313 true]
[277076930200555 64 "Dorrene" 13194139534313 true]
[277076930200556 64 "Benti" 13194139534313 true]
[277076930200557 64 "Derek" 13194139534313 true]
[277076930200558 64 "Kristen" 13194139534313 true])
:t 1001}
:done]
(map (fn [result]
(if (= result :done)
:done
;; drop tx datom and id
(-> result
(update :data rest)
(dissoc :id))))
(take-segments! persist)))))
(let [job (build-job db-uri 1014 10 1000)
{:keys [persist]} (core-async/get-core-async-channels job)]
(mapv (partial ensure-datomic! db-uri) [people3 people4 people4 people4])
(onyx.api/submit-job peer-config job)
(is (=
[{:data '([277076930200560 64 "Mike2" 13194139534319 true]
[277076930200561 64 "Dorrene2" 13194139534319 true]
[277076930200562 64 "Benti2" 13194139534319 true]), :t 1007}
{:data '([277076930200564 64 "Mike3" 13194139534323 true]
[277076930200565 64 "Dorrene3" 13194139534323 true]
[277076930200566 64 "Benti3" 13194139534323 true]), :t 1011} :done]
(map (fn [result]
(if (= result :done)
:done
;; drop tx datom and id
(-> result
(update :data rest)
(dissoc :id))))
(take-segments! persist)))))))
(finally (d/delete-database db-uri)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment