Skip to content

Instantly share code, notes, and snippets.

@gardnervickers
Last active December 10, 2015 03:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gardnervickers/cdc48ce5762ac5819384 to your computer and use it in GitHub Desktop.
Save gardnervickers/cdc48ce5762ac5819384 to your computer and use it in GitHub Desktop.

Onyx: Windowing, triggers and refinements.

With Onyx 0.8, we gain the ability to do state management. Onyx introduces windowing, as described in Google's Dataflow Model paper. 1

We will discuss windowing, triggers, and refinements and how they interact. Showing the different types of windows and triggers. When not providing concrete implementations, we will provide practical problems each type solves.

Time


There are two notions of time discussed in the literature.

Event Time: The time an event actually occurred. Temperature at 5:00pm, Height at age 22, or even 20th person to hike K2. The Event Time is an attribute that, in relation to all other segments, can be placed in a global monotonic order.

Processing Time: The time that the event is observed while being processed. This is akin to the wall clock time of the system, and allows us to make statements about segments being processed at arbitrary moments.

Window Types


All windows provide a way to exclude segments based on an attribute about that segment. This attribute determines the classification for the window. We will list the types of windows from most to least general.

  • Predicate Window
    • Onyx does not currently support this.
    • Exclude segments by:
      • Arbitrary predicate on the segments.
  • Session Window
    • Exclude segments by:
      • :key in the segment
      • Timeout gap (timespan) occurred inside of.
    • This is a generalization on predicate windows where:
      • Predicated on the :key and Event Time 2 of a segment.
  • Sliding Window
    • Exclude segments by:
      • Their count in the local ordering of segments.
    • This is a generalization on session windows where:
      • The :key result is the same for every segment.
      • The timeout gap is from 0 -> N where N is the window width and 0 was established at window creation.2
  • Fixed Window
    • Exclude segments by:
      • Their count in the global ordering of segments.
    • This is a generalization on sliding windows where:
      • The timeout gap is from 0 -> N where N is the width of the window and 0 was established at job creation.
  • Global Window
    • Does not exclude any segments in the system.
    • This is a generalization on fixed windows where:
      • The timeout gap is from 0 -> infinity where 0 was established at job creation.

Triggers


While Windows deal with Event Time, triggers deal with Processing Time. Thats to say that triggers care about when something happens during the processing of a stream.

Just like Event Time, this can be any monotonically increasing attribute. It's perfectly valid to trigger on either chronological instants, or sequence ranges (every 10 segments).

When a trigger fires, it does something with the window's state.

Time Based Triggers

Triggers that are time-based fire on user defined intervals. This is useful for queries like Report the average temperature every 10 seconds.

Segment Based Triggers

These triggers are identical to time based triggers except they deal with an abstract notion of time (sequence ranges). They are useful for queries like Report the average windspeed every 100 data points.

Punctuation Triggers

These triggers evaluate segments in a window to decide if they should fire or not. The evaluation can be any arbitrary predicate. Onyx offers two more triggers that build on top of punctuation triggers for convenience. Those are :watermark and :percentile-watermark triggers. Punctuation triggers are useful for queries like Collect all events from user #5 until we receive a :logout event

Composite Triggers

Triggers can be combined using :and, :or, and :not. With this it's possible to make very complex triggers, like Find all users in our sales funnel with 3 items in their cart and 2 past purchases, or 10 items in their cart that have recently logged out.

Trigger Refinements

Triggers offer three different refinement modes.

  • Accumulating
    • After firing a trigger the window retains it's state.
    • The retained state can be written to and fired again.
    • It is possible to see the same segments fired many times.
  • Discarding
    • After firing a trigger, the window's state is emptied.
    • Once a segment is fired, it will not be seen a second time.
  • Accumulating and Discarding
    • The state of a window can be both added to and retracted from.
    • Useful for representing datasets where it's important to keep track of when things become untrue.

Examples

With windows and triggers, we can do some interesting operations. The following examples will be using the Onyx framework, but could be extended to any other window/trigger implementation (Google Dataflow).

Caveat: Onyx does not currently support Predicate Windows, or Accumulating and Discarding refinements at the time of writing (December 9th, 2015). These improvements will be made in the near future.

Scenario

We have a weather analytics platform, allowing meteorologists across the globe to query a large, unbounded stream of sensor data in real time. The sensor data is stored in a Kafka queue and is never deleted. Each of our sensors report segments to Kafka of the following shape.

[{:sensor-id 1, :temperature 99, :wind-speed 17, :humidity 38, :time "02:34"}
 {:sensor-id 2, :temperature 30, :wind-speed 20, :humidity 14, :time "00:46"}
 {:sensor-id 3, :temperature 10, :wind-speed 28, :humidity 51, :time "06:04"}
 {:sensor-id 1, :temperature 29, :wind-speed 93, :humidity 57, :time "10:28"}
 {:sensor-id 2, :temperature 41, :wind-speed 96, :humidity 16, :time "11:22"}
 {:sensor-id 3, :temperature 30, :wind-speed 79, :humidity 92, :time "11:23"}
 {:sensor-id 1, :temperature 18, :wind-speed 89, :humidity 97, :time "03:38"}
 {:sensor-id 2, :temperature 97, :wind-speed 44, :humidity 15, :time "11:46"}
 {:sensor-id 3, :temperature 75, :wind-speed 96, :humidity 78, :time "01:43"}]

I want to...


  • Write new segments to our backup system every 10 minutes.
    • Window: Global
    • Trigger: Time Based (10 minutes)
    • Refinement: Discarding

It would be trivial to change "every 10 minutes", to "every 1000 segments". Just switch the trigger type from time-based to segment based. There may be scenarios where you want to backup segments when 1000 segments have passed by or 10 minutes has elapsed. That can be achieved with composite triggers.


  • Every day report what sensors are live and still kicking back data.
    • Window: Fixed Window (1 day)
    • Trigger: Time Based (1 day)
    • Refinement: Discarding

  • Every hour, report the average temperature over the last 24 hours.
    • Window: Sliding Window (range: 24hrs slide: 1hr)
    • Trigger: Watermark
    • Refinement: Accumulating

We use a watermark trigger here, acknowledging that we will receive our report as soon as a segment outside of our window range is detected. We might run into trouble if all of the sensors suddenly go down, but in situations where the absence of a trigger firing has meaning, this is fine.

We are also using accumulating refinement, because if a sensor is up, but it's segment was re-ordered or delayed in some way, we still want to make sure it's reported as being alive during it's 1hr period it reported in.




1: Almost everything in this post can be gleaned from the Dataflow Model paper. I am re-writing what I have read in order to better understand it myself and show how it can be applied to practical problems.

2: Event Time is date-time generalized. It can also be the count of a segment in relation to a larger ordering of segments. Event Time can be any monotonically increasing value.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment