Skip to content

Instantly share code, notes, and snippets.

@0xqd
Created November 19, 2015 10:45
Show Gist options
  • Save 0xqd/b39f06e514bb254ce715 to your computer and use it in GitHub Desktop.
Save 0xqd/b39f06e514bb254ce715 to your computer and use it in GitHub Desktop.
(defn build-catalog [batch-size batch-timeout]
[{:onyx/name :kafka-read-messages
:onyx/plugin :onyx.plugin.kafka/read-messages
:onyx/type :input
:onyx/medium :kafka
:kafka/topic "test"
:kafka/group-id "onyx-consumer"
:kafka/fetch-size 307200
:kafka/chan-capacity 1000
:kafka/zookeeper "127.0.0.1:2181"
:kafka/offset-reset :smallest
:kafka/force-reset? true
:kafka/empty-read-back-off 500
:kafka/commit-interval 500
:kafka/deserializer-fn :my.ns/deserializer-fn
:onyx/min-peers 10
:onyx/max-peers 10
:onyx/batch-size 100
:onyx/doc "Reads messages from a Kafka topic"}
{:onyx/name :out
:onyx/plugin :onyx.plugin.core-async/output
:onyx/type :output
:onyx/medium :core.async
:onyx/max-peers 1
:onyx/batch-timeout batch-timeout
:onyx/batch-size batch-size
:onyx/doc "Writes segments to a core.async channel"}])
(defn build-lifecycles []
[
;; kafka-read-messages
{:lifecycle/task :kafka-read-messages
:lifecycle/calls :onyx.plugin.kafka/read-messages-calls}
;; output
{:lifecycle/task :out
:lifecycle/calls :onyx-starter.lifecycles.sample-lifecycle/out-calls}
{:lifecycle/task :out
:lifecycle/calls :onyx.plugin.core-async/writer-calls}])
(def workflow1
[[:kafka-read-messages :out]])
(deftest test-sample-dev-job1
;; 8 peers for 8 distinct tasks in the workflow
(let [dev-env (component/start (onyx-dev-env 8))]
(try
(let [{:keys [out]} (submit-job1 dev-env)]
(clojure.pprint/pprint out)
#_(is (= 12 (count question-output)))
#_(is (= 12 (count loud-output))))
(finally
(component/stop dev-env)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment