{:ip "", :reacheable? false}
Onyx: Windowing, triggers and refinements.

With Onyx 0.8, we gain the ability to do state management. Onyx introduces windowing, as described in Google's Dataflow Model paper. 1

We will discuss windowing, triggers, and refinements and how they interact. Showing the different types of windows and triggers. When not providing concrete implementations, we will provide practical problems each type solves.


There are two notions of time discussed in the literature.

(ns statefunction.web
(:require [compojure.core :refer [defroutes GET PUT POST DELETE ANY]]
[compojure.handler :refer [site]]
[compojure.route :as route]
[ :as io]
[ring.middleware.stacktrace :as trace]
[ring.middleware.session :as session]
[ring.middleware.session.cookie :as cookie]
[ring.adapter.jetty :as jetty]
[ring.middleware.basic-authentication :as basic]
(ns fuzzy-schema.core-test
(:require [schema
[core :as s]
[utils :as utils]]
[schema.spec.core :as spec]))
(def sample-schema
{:zookeeper/configuration {:zookeeper/address s/Str
:zookeeper/port s/Num
(s/defn input-task
([task-name :- s/Keyword {:keys [kafka/topic kafka/group-id
kafka/zookeeper kafka/offset-reset
kafka/force-reset? kafka/deserializer-fn
kafka/chan-capacity kafka/fetch-size
kafka/empty-read-back-off kafka/commit-interval] :as opts}]
{:task {:task-map (merge {:onyx/name task-name
:onyx/plugin :onyx.plugin.kafka/read-messages
:onyx/type :input
:onyx/medium :kafka
(defn build-job [db-uri log-end-tx batch-size batch-timeout]
(let [batch-settings {:onyx/batch-size batch-size :onyx/batch-timeout batch-timeout}
base-job (merge {:workflow [[:read-log :persist]]
:catalog []
:lifecycles []
:windows []
:triggers []
:flow-conditions []
(def msg-seq [{:msg 1} {:msg 2} {:msg 3} {:type :barrier} {:msg 4} {:msg 5}])
(defn start-peer [id a]
(let [ch (chan)
peer (go-loop [msg (<! ch)]
(when msg
(swap! a update id conj msg)
(recur (<! ch))))]
(def msg-seq [{:msg 1} {:msg 2} {:msg 3} {:type :barrier} {:msg 4} {:msg 5}])
(defn start-peer [id a sleepiness] ;; Simulates a peer lifecycle, just writing the message it got under it's slot in a db atom
(let [ch (chan)
peer (go-loop [msg (<! ch)]
(when msg
(Thread/sleep sleepiness)
(swap! a update id conj msg)
(recur (<! ch))))]
[[:window :fixed [10 :minutes]]
[:aggregating [:average :emoji-key]]]
[[:trigger :watermark]
[:on-trigger :accumulating :fire-extent ::to-stdout]]
[[:window :sliding [10 :minutes] :every [1 :minute]]
[:aggregating :conj]]
[[:trigger [:segment 10]]
[:on-trigger :discarding :fire-all-extents ::to-stdout]]