Skip to content

Instantly share code, notes, and snippets.

@0xqd
Created November 20, 2015 21:59
Show Gist options
  • Save 0xqd/0cb8cb9eb8c3e2f43e9c to your computer and use it in GitHub Desktop.
Save 0xqd/0cb8cb9eb8c3e2f43e9c to your computer and use it in GitHub Desktop.
(ns onyx-starter.core
(:require
[clojure.core.async :as async]
[onyx.plugin.core-async :refer [take-segments!]]
[com.stuartsierra.component :as component]
[taoensso.timbre :refer [info error] :as timbre]
[onyx.plugin.kafka]
[clj-kafka.producer :as kp]
[clj-kafka.admin :as kadmin]
[onyx.kafka.embedded-server :as ke]
[onyx.api]
[midje.sweet :refer :all]))
(def id (java.util.UUID/randomUUID))
(def zk-addr "127.0.0.1:2182")
;; Onyx
(def env-config
{:zookeeper/address zk-addr
:zookeeper/server? true
:zookeeper.server/port 2182
:onyx/id id})
(def peer-config
{:zookeeper/address zk-addr
:onyx.peer/job-scheduler :onyx.job-scheduler/greedy
:onyx.messaging/impl :aeron
:onyx.messaging/peer-port 40199
:onyx.messaging/bind-addr "localhost"
:onyx/id id})
(def env (onyx.api/start-env
env-config))
(def peer-group (onyx.api/start-peer-group peer-config))
(with-open [zk (kadmin/zk-client zk-addr)]
(kadmin/create-topic zk "test"
{:partitions 2}))
(defn deserialize-message [bytes]
(read-string (String. bytes "UTF-8")))
(def workflow
[[:read-messages :identity]
[:identity :out]])
#_(def workflow
[[:read-messages :posting-transform]
[:posting-transform :out]])
(defn posting-transform [x]
(prn "hahaah"))
(def catalog
[{:onyx/name :read-messages
:onyx/plugin :onyx.plugin.kafka/read-messages
:onyx/type :input
:onyx/medium :kafka
:kafka/topic "test"
:kafka/group-id "onyx-consumer"
:kafka/fetch-size 307200
:kafka/chan-capacity 1000
:kafka/zookeeper "127.0.0.1:2181"
:kafka/offset-reset :smallest
:kafka/force-reset? true
:kafka/empty-read-back-off 500
:kafka/commit-interval 500
:kafka/deserializer-fn :onyx-starter.core/deserialize-message
:onyx/min-peers 2
:onyx/max-peers 2
:onyx/batch-size 100
:onyx/doc "Reads messages from a Kafka topic"}
{:onyx/name :identity
:onyx/fn :clojure.core/identity
:onyx/type :function
:onyx/batch-size 100}
{:onyx/name :posting-transform
:onyx/fn :onyx-starter.core/posting-transform
:onyx/type :function
:onyx/batch-size 100}
{:onyx/name :out
:onyx/plugin :onyx.plugin.core-async/output
:onyx/type :output
:onyx/medium :core.async
:onyx/max-peers 1
:onyx/batch-size 100
:onyx/doc "Writes segments to a core.async channel"}
])
;; buffer
(def out-chan (async/chan 1000000))
(async/go-loop []
(try
(let [x (async/<! out-chan)]
(println "value from go loop" x))
(catch Exception e
(prn e)))
(recur))
(defn inject-out-ch [event lifecycle]
{:core.async/chan out-chan})
(def out-calls
{:lifecycle/before-task-start inject-out-ch})
(def lifecycles
[;; kafka
{:lifecycle/task :read-messages
:lifecycle/calls :onyx.plugin.kafka/read-messages-calls}
;; out
{:lifecycle/task :out
:lifecycle/calls :onyx-starter.core/out-calls}
{:lifecycle/task :out
:lifecycle/calls :onyx.plugin.core-async/writer-calls}
])
(def v-peers (onyx.api/start-peers 4 peer-group))
;; run
(onyx.api/submit-job
peer-config
{:catalog catalog
:workflow workflow
:lifecycles lifecycles
:task-scheduler :onyx.task-scheduler/balanced})
(def producer
(kp/producer
{"metadata.broker.list" "192.168.1.60:9092"
"serializer.class" "kafka.serializer.DefaultEncoder"
"partitioner.class" "kafka.producer.DefaultPartitioner"}))
(def posting
{:name "Backend Engineer"
:user-id 1
:price 50})
(kp/send-message producer
(kp/message "test" (.getBytes (pr-str posting))))
;; functions
(defn stop []
(doseq [v-peer v-peers]
(onyx.api/shutdown-peer v-peer))
(onyx.api/shutdown-peer-group peer-group)
(onyx.api/shutdown-env env)
(component/stop kafka-server))
(stop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment