Created
November 19, 2015 10:45
-
-
Save 0xqd/b39f06e514bb254ce715 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn build-catalog [batch-size batch-timeout] | |
[{:onyx/name :kafka-read-messages | |
:onyx/plugin :onyx.plugin.kafka/read-messages | |
:onyx/type :input | |
:onyx/medium :kafka | |
:kafka/topic "test" | |
:kafka/group-id "onyx-consumer" | |
:kafka/fetch-size 307200 | |
:kafka/chan-capacity 1000 | |
:kafka/zookeeper "127.0.0.1:2181" | |
:kafka/offset-reset :smallest | |
:kafka/force-reset? true | |
:kafka/empty-read-back-off 500 | |
:kafka/commit-interval 500 | |
:kafka/deserializer-fn :my.ns/deserializer-fn | |
:onyx/min-peers 10 | |
:onyx/max-peers 10 | |
:onyx/batch-size 100 | |
:onyx/doc "Reads messages from a Kafka topic"} | |
{:onyx/name :out | |
:onyx/plugin :onyx.plugin.core-async/output | |
:onyx/type :output | |
:onyx/medium :core.async | |
:onyx/max-peers 1 | |
:onyx/batch-timeout batch-timeout | |
:onyx/batch-size batch-size | |
:onyx/doc "Writes segments to a core.async channel"}]) | |
(defn build-lifecycles [] | |
[ | |
;; kafka-read-messages | |
{:lifecycle/task :kafka-read-messages | |
:lifecycle/calls :onyx.plugin.kafka/read-messages-calls} | |
;; output | |
{:lifecycle/task :out | |
:lifecycle/calls :onyx-starter.lifecycles.sample-lifecycle/out-calls} | |
{:lifecycle/task :out | |
:lifecycle/calls :onyx.plugin.core-async/writer-calls}]) | |
(def workflow1 | |
[[:kafka-read-messages :out]]) | |
(deftest test-sample-dev-job1 | |
;; 8 peers for 8 distinct tasks in the workflow | |
(let [dev-env (component/start (onyx-dev-env 8))] | |
(try | |
(let [{:keys [out]} (submit-job1 dev-env)] | |
(clojure.pprint/pprint out) | |
#_(is (= 12 (count question-output))) | |
#_(is (= 12 (count loud-output)))) | |
(finally | |
(component/stop dev-env))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment