Skip to content

Instantly share code, notes, and snippets.

(defn start-go-peer
"Applies XF"
([in-chan xf] (start-go-peer in-chan xf (chan)))
([in-chan xf out-chan] (start-go-peer in-chan xf out-chan 1))
([in-chan xf out-chan parallelism]
(let [running? (atom true)]
(do (async/pipeline parallelism out-chan xf in-chan)
{:fauonyx/out out-chan
:fauonyx/in in-chan
{:ip "192.168.204.125", :reacheable? false}
{:ip "192.168.204.126", :reacheable? false}
{:ip "192.168.204.253", :reacheable? false}
{:ip "192.168.204.254", :reacheable? false}
{:ip "192.168.248.119", :reacheable? false}
{:ip "192.168.248.163", :reacheable? false}
{:ip "192.168.248.217", :reacheable? false}
{:ip "192.168.248.227", :reacheable? false}
{:ip "192.168.248.232", :reacheable? false}
{:ip "192.168.248.235", :reacheable? false}
@gardnervickers
gardnervickers / .md
Last active December 10, 2015 03:51

Onyx: Windowing, triggers and refinements.

With Onyx 0.8, we gain the ability to do state management. Onyx introduces windowing, as described in Google's Dataflow Model paper. 1

We will discuss windowing, triggers, and refinements and how they interact. Showing the different types of windows and triggers. When not providing concrete implementations, we will provide practical problems each type solves.

Time


There are two notions of time discussed in the literature.

15-Dec-10 10:39:17 Gardner-Vickers-Mac.local INFO [onyx.log.zookeeper] - Stopping ZooKeeper client connection
Exception updating the ns-cache #error {
:cause Task clojure.lang.Agent$Action@e382ea9 rejected from java.util.concurrent.ThreadPoolExecutor@44771cf1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 4]
:via
[{:type java.util.concurrent.RejectedExecutionException
:message Task clojure.lang.Agent$Action@e382ea9 rejected from java.util.concurrent.ThreadPoolExecutor@44771cf1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 4]
:at [java.util.concurrent.ThreadPoolExecutor$AbortPolicy rejectedExecution ThreadPoolExecutor.java 2047]}]
:trace
[[java.util.concurrent.ThreadPoolExecutor$AbortPolicy rejectedExecution ThreadPoolExecutor.java 2047]
[java.util.concurrent.ThreadPoolExecutor reject ThreadPoolExecutor.java 823]
(ns fuzzy-schema.core-test
(:require [schema
[core :as s]
[utils :as utils]]
[schema.spec.core :as spec]))
(def sample-schema
{:zookeeper/configuration {:zookeeper/address s/Str
:zookeeper/port s/Num
}
(s/defn input-task
([task-name :- s/Keyword {:keys [kafka/topic kafka/group-id
kafka/zookeeper kafka/offset-reset
kafka/force-reset? kafka/deserializer-fn
kafka/chan-capacity kafka/fetch-size
kafka/empty-read-back-off kafka/commit-interval] :as opts}]
{:task {:task-map (merge {:onyx/name task-name
:onyx/plugin :onyx.plugin.kafka/read-messages
:onyx/type :input
:onyx/medium :kafka
(defn build-job [db-uri log-end-tx batch-size batch-timeout]
(let [batch-settings {:onyx/batch-size batch-size :onyx/batch-timeout batch-timeout}
base-job (merge {:workflow [[:read-log :persist]]
:catalog []
:lifecycles []
:windows []
:triggers []
:flow-conditions []
(def msg-seq [{:msg 1} {:msg 2} {:msg 3} {:type :barrier} {:msg 4} {:msg 5}])
(defn start-peer [id a sleepiness] ;; Simulates a peer lifecycle, just writing the message it got under it's slot in a db atom
(let [ch (chan)
peer (go-loop [msg (<! ch)]
(when msg
(Thread/sleep sleepiness)
(swap! a update id conj msg)
(recur (<! ch))))]
ch))
(def msg-seq [{:msg 1} {:msg 2} {:msg 3} {:type :barrier} {:msg 4} {:msg 5}])
(defn start-peer [id a]
(let [ch (chan)
peer (go-loop [msg (<! ch)]
(when msg
(swap! a update id conj msg)
(recur (<! ch))))]
ch))
[[:window :fixed [10 :minutes]]
[:aggregating [:average :emoji-key]]]
[[:trigger :watermark]
[:on-trigger :accumulating :fire-extent ::to-stdout]]
[[:window :sliding [10 :minutes] :every [1 :minute]]
[:aggregating :conj]]
[[:trigger [:segment 10]]
[:on-trigger :discarding :fire-all-extents ::to-stdout]]