Created
November 20, 2015 21:59
-
-
Save 0xqd/0cb8cb9eb8c3e2f43e9c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns onyx-starter.core | |
(:require | |
[clojure.core.async :as async] | |
[onyx.plugin.core-async :refer [take-segments!]] | |
[com.stuartsierra.component :as component] | |
[taoensso.timbre :refer [info error] :as timbre] | |
[onyx.plugin.kafka] | |
[clj-kafka.producer :as kp] | |
[clj-kafka.admin :as kadmin] | |
[onyx.kafka.embedded-server :as ke] | |
[onyx.api] | |
[midje.sweet :refer :all])) | |
(def id (java.util.UUID/randomUUID)) | |
(def zk-addr "127.0.0.1:2182") | |
;; Onyx | |
(def env-config | |
{:zookeeper/address zk-addr | |
:zookeeper/server? true | |
:zookeeper.server/port 2182 | |
:onyx/id id}) | |
(def peer-config | |
{:zookeeper/address zk-addr | |
:onyx.peer/job-scheduler :onyx.job-scheduler/greedy | |
:onyx.messaging/impl :aeron | |
:onyx.messaging/peer-port 40199 | |
:onyx.messaging/bind-addr "localhost" | |
:onyx/id id}) | |
(def env (onyx.api/start-env | |
env-config)) | |
(def peer-group (onyx.api/start-peer-group peer-config)) | |
(with-open [zk (kadmin/zk-client zk-addr)] | |
(kadmin/create-topic zk "test" | |
{:partitions 2})) | |
(defn deserialize-message [bytes] | |
(read-string (String. bytes "UTF-8"))) | |
(def workflow | |
[[:read-messages :identity] | |
[:identity :out]]) | |
#_(def workflow | |
[[:read-messages :posting-transform] | |
[:posting-transform :out]]) | |
(defn posting-transform [x] | |
(prn "hahaah")) | |
(def catalog | |
[{:onyx/name :read-messages | |
:onyx/plugin :onyx.plugin.kafka/read-messages | |
:onyx/type :input | |
:onyx/medium :kafka | |
:kafka/topic "test" | |
:kafka/group-id "onyx-consumer" | |
:kafka/fetch-size 307200 | |
:kafka/chan-capacity 1000 | |
:kafka/zookeeper "127.0.0.1:2181" | |
:kafka/offset-reset :smallest | |
:kafka/force-reset? true | |
:kafka/empty-read-back-off 500 | |
:kafka/commit-interval 500 | |
:kafka/deserializer-fn :onyx-starter.core/deserialize-message | |
:onyx/min-peers 2 | |
:onyx/max-peers 2 | |
:onyx/batch-size 100 | |
:onyx/doc "Reads messages from a Kafka topic"} | |
{:onyx/name :identity | |
:onyx/fn :clojure.core/identity | |
:onyx/type :function | |
:onyx/batch-size 100} | |
{:onyx/name :posting-transform | |
:onyx/fn :onyx-starter.core/posting-transform | |
:onyx/type :function | |
:onyx/batch-size 100} | |
{:onyx/name :out | |
:onyx/plugin :onyx.plugin.core-async/output | |
:onyx/type :output | |
:onyx/medium :core.async | |
:onyx/max-peers 1 | |
:onyx/batch-size 100 | |
:onyx/doc "Writes segments to a core.async channel"} | |
]) | |
;; buffer | |
(def out-chan (async/chan 1000000)) | |
(async/go-loop [] | |
(try | |
(let [x (async/<! out-chan)] | |
(println "value from go loop" x)) | |
(catch Exception e | |
(prn e))) | |
(recur)) | |
(defn inject-out-ch [event lifecycle] | |
{:core.async/chan out-chan}) | |
(def out-calls | |
{:lifecycle/before-task-start inject-out-ch}) | |
(def lifecycles | |
[;; kafka | |
{:lifecycle/task :read-messages | |
:lifecycle/calls :onyx.plugin.kafka/read-messages-calls} | |
;; out | |
{:lifecycle/task :out | |
:lifecycle/calls :onyx-starter.core/out-calls} | |
{:lifecycle/task :out | |
:lifecycle/calls :onyx.plugin.core-async/writer-calls} | |
]) | |
(def v-peers (onyx.api/start-peers 4 peer-group)) | |
;; run | |
(onyx.api/submit-job | |
peer-config | |
{:catalog catalog | |
:workflow workflow | |
:lifecycles lifecycles | |
:task-scheduler :onyx.task-scheduler/balanced}) | |
(def producer | |
(kp/producer | |
{"metadata.broker.list" "192.168.1.60:9092" | |
"serializer.class" "kafka.serializer.DefaultEncoder" | |
"partitioner.class" "kafka.producer.DefaultPartitioner"})) | |
(def posting | |
{:name "Backend Engineer" | |
:user-id 1 | |
:price 50}) | |
(kp/send-message producer | |
(kp/message "test" (.getBytes (pr-str posting)))) | |
;; functions | |
(defn stop [] | |
(doseq [v-peer v-peers] | |
(onyx.api/shutdown-peer v-peer)) | |
(onyx.api/shutdown-peer-group peer-group) | |
(onyx.api/shutdown-env env) | |
(component/stop kafka-server)) | |
(stop) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment