Skip to content

Instantly share code, notes, and snippets.

@jeremyheiler
Created February 6, 2017 22:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeremyheiler/e049f1fc58e19103e91d3967d29b9847 to your computer and use it in GitHub Desktop.
Save jeremyheiler/e049f1fc58e19103e91d3967d29b9847 to your computer and use it in GitHub Desktop.
(ns foo.search.main
(:require
[clojure.core.async :refer [chan sliding-buffer >!! <!!]]
[clojure.pprint :refer [pprint]]
[datomic.api :as d]
[onyx.plugin.core-async]
[onyx.plugin.datomic]
[onyx.api]))
(def out-chan
(chan 1000))
(defn inject-out-ch
[event lifecycle]
{:core.async/chan out-chan})
(def out-calls
{:lifecycle/before-task-start inject-out-ch})
(defn run
[]
(println "Starting Onyx...")
(let [onyx-id (java.util.UUID/randomUUID)
env-config {:zookeeper/address "127.0.0.1:2188"
:zookeeper/server? true
:zookeeper.server/port 2188
:onyx/tenancy-id onyx-id}
peer-config {:zookeeper/address "127.0.0.1:2188"
:onyx.peer/job-scheduler :onyx.job-scheduler/greedy
:onyx.messaging/impl :aeron
:onyx.messaging/peer-port 40200
:onyx.messaging/bind-addr "localhost"
:onyx/tenancy-id onyx-id}
env (onyx.api/start-env env-config)
peer-group (onyx.api/start-peer-group peer-config)
peers (onyx.api/start-peers 10 peer-group)
catalog [{:onyx/name :read-log
:onyx/plugin :onyx.plugin.datomic/read-log
:onyx/type :input
:onyx/medium :datomic
:datomic/uri "datomic:ddb-local://localhost:8000/foo/foo"
:datomic/log-start-tx 1234
:datomic/log-end-tx nil
;;:checkpoint/force-reset? true
:onyx/max-peers 1
:onyx/batch-timeout 50
:onyx/batch-size 10
:onyx/doc "Reads a sequence of datoms from the d/log API"}
{:onyx/name :out
:onyx/plugin :onyx.plugin.core-async/output
:onyx/type :output
:onyx/medium :core.async
:onyx/max-peers 1
:onyx/batch-timeout 50
:onyx/batch-size 10
:onyx/doc "Writes segments to a core.async channel"}]
lifecycles [{:lifecycle/task :read-log
:lifecycle/calls :onyx.plugin.datomic/read-log-calls}
{:lifecycle/task :out
:lifecycle/calls :foo.search.main/out-calls}
{:lifecycle/task :out
:lifecycle/calls :onyx.plugin.core-async/writer-calls}]
job {:workflow [[:read-log :out]]
:catalog catalog
:lifecycles lifecycles
:task-scheduler :onyx.task-scheduler/balanced}]
(try
(onyx.api/submit-job peer-config job)
(pprint (onyx.plugin.core-async/take-segments! out-chan))
(finally
(println "Stopping Onyx...")
(doseq [v-peer peers]
(onyx.api/shutdown-peer v-peer))
(onyx.api/shutdown-peer-group peer-group)
(onyx.api/shutdown-env env)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment