Skip to content

Instantly share code, notes, and snippets.

@gardnervickers
Created March 4, 2016 02:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gardnervickers/7705757653df28dd0e49 to your computer and use it in GitHub Desktop.
Save gardnervickers/7705757653df28dd0e49 to your computer and use it in GitHub Desktop.
(s/defn input-task
([task-name :- s/Keyword {:keys [kafka/topic kafka/group-id
kafka/zookeeper kafka/offset-reset
kafka/force-reset? kafka/deserializer-fn
kafka/chan-capacity kafka/fetch-size
kafka/empty-read-back-off kafka/commit-interval] :as opts}]
{:task {:task-map (merge {:onyx/name task-name
:onyx/plugin :onyx.plugin.kafka/read-messages
:onyx/type :input
:onyx/medium :kafka
:kafka/topic topic
:kafka/group-id group-id
:kafka/zookeeper zookeeper
:kafka/offset-reset offset-reset
:kafka/force-reset? force-reset?
:kafka/deserializer-fn deserializer-fn
:kafka/chan-capacity chan-capacity
:kafka/fetch-size fetch-size
:kafka/empty-read-back-off empty-read-back-off
:kafka/commit-interval commit-interval
:onyx/doc "Reads messages from a Kafka topic"}
opts)
:lifecycles [{:lifecycle/task task-name
:lifecycle/calls :onyx.plugin.kafka/read-messages-calls}]}
:schema {:task-map (merge os/TaskMap KafkaInputSchema)
:lifecycles [os/Lifecycle]}}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment