Skip to content

Instantly share code, notes, and snippets.

@kjothen
Created April 27, 2017 18:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kjothen/6ad77ed36739a0904b75929b810c6c56 to your computer and use it in GitHub Desktop.
Save kjothen/6ad77ed36739a0904b75929b810c6c56 to your computer and use it in GitHub Desktop.
trigger/emit example problem
(ns onyx-trigger-problem.jobs.basic
(:require [onyx.job :refer [add-task register-job]]
[onyx.tasks.core-async :as core-async-task]
[onyx-trigger-problem.tasks.math :as math]))
(defn batch-done?
[trigger {:keys [event-type segment] :as state-event}]
(= event-type :job-completed))
(defn emit-inc
[event window trigger {:keys [lower-bound upper-bound event-type] :as state-event} extent-state]
(when extent-state
(let [new-extent-state [{:n (reduce #(+ %1 (:n %2)) 0 extent-state)}]]
(println extent-state)
(println new-extent-state)
new-extent-state)))
(defn basic-job
[batch-settings]
(let [base-job {:workflow [[:in :inc]
[:inc :out]]
:catalog []
:lifecycles []
:windows [{:window/id :window-inc
:window/task :inc
:window/type :global
:window/aggregation :onyx.windowing.aggregation/conj}]
:triggers [{:trigger/window-id :window-inc
:trigger/id :punctuation-trigger
:trigger/refinement :onyx.refinements/discarding
:trigger/fire-all-extents? true
:trigger/on :onyx.triggers/punctuation
:trigger/pred ::batch-done?
:trigger/emit ::emit-inc}
{:trigger/window-id :window-inc
:trigger/id :segment-trigger
:trigger/refinement :onyx.refinements/discarding
:trigger/fire-all-extents? true
:trigger/on :onyx.triggers/segment
:trigger/threshold [5 :elements]
:trigger/emit ::emit-inc}]
:flow-conditions []
:task-scheduler :onyx.task-scheduler/balanced}]
(-> base-job
(add-task (core-async-task/input :in batch-settings))
(add-task (math/inc-key :inc [:n] batch-settings))
(add-task (core-async-task/output :out batch-settings)))))
(defmethod register-job "basic-job"
[job-name config]
(let [batch-settings {:onyx/batch-size 1 :onyx/batch-timeout 1000}]
(basic-job batch-settings)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment