Skip to content

Instantly share code, notes, and snippets.

@wjrmffldrhrl
Last active March 30, 2022 13:42
Show Gist options
  • Save wjrmffldrhrl/13137676c3394c56eff8fce19383275e to your computer and use it in GitHub Desktop.
Save wjrmffldrhrl/13137676c3394c56eff8fce19383275e to your computer and use it in GitHub Desktop.
Streaming Analytics on Flink 2.md

Window

플링크는 매우 풍부한 윈도우 시멘틱을 제공합니다.

Introduction

스트림 처리를 수행할 때 아래와 같은 질문에 답하기 위해 스트림의 제한된 하위 집합에 대해 집계된 분석을 계산하려는 것은 자연스러운 일입니다.

It is natural when doing stream processing to want to compute aggregated analytics on bounded subsets of the streams in order to answer questions like these:

  • 분당 페이지 뷰 수
  • 주당 사용자 별 세션 수
  • 분당 센서별 최대 온도

플링크에서 윈도우 분석 계산은 두 가지 주요 추상화에 의존합니다.

  • 이벤트를 윈도우에 할당하는 윈도우 할당기
  • 윈도우에 할당 된 이벤트에 적용하는 윈도우 함수

플링크의 윈도우 API는 윈도우 함수를 언제 호출하는지 정의하는 트리거(Trigger)와 윈도우에 수집된 요소를 제거할 수 있는 에빅터(Eveictor)라는 개념도 있습니다.

간단한 형태로, 당신은 키 스트림에 윈도우 함수를 적용할 수 있습니다.

stream.
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce|aggregate|process(<window function>);

또한 키가 없는 스트림에도 윈도우 함수를 사용할 수 있습니다.
그러나 아래 예시처럼 구성할 경우 병렬적으로 수행되지 않습니다.

stream.
    .windowAll(<window assigner>)
    .reduce|aggregate|process(<window function>);

Window Assigners

플링크는 아래 그림과 같은 내장된 몇 가지 윈도우 할당자를 가지고 있습니다. image

몇 가지 예로 윈도우 할당자가 어떻게 사용되고 어떻게 지정하는지 알 수 있습니다.

  • Tumbling time windows
    • 분당 페이지 뷰
    • TumblingEventTimeWindows.of(Time.minutes(1))
  • Sliding time windows
    • 10초마다 계산되는 분당 페이지 뷰
    • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
  • Session windows
    • 세션당 페이지 뷰, 세션의 간격이 최소 30분이 되는걸 의미합니다.
    • EventTimeSessionWindows.withGap(Time.minutes(30)) 기간은 Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), Time.days(n)로 지정할 수 있습니다.

시간 기반 윈도우(세션 윈도우 포함) 할당자는 이벤트 시간과 처리시간 특성으로 제공됩니다. 두 종류의 시간 윈도우 사이에는 중요한 절충점이 있습니다.
처리 시간 윈도우를 사용하면 당신은 아래 제한사항들을 허용해야 합니다.

  • 과거 데이터는 정확하게 처리할 수 없습니다.
  • 정렬이 되지 않은 데이터는 정확하게 처리할 수 없습니다.
  • 결과는 결정적이지 않을 것입니다. 그러나 지연이 적다는 장점이 있습니다.

카운트에 기반한 윈도우로 작업한다면, 배치가 끝나기 전 까지 결과가 나오지 않는다는걸 명심해야 합니다. 시간 초과에 관련한 옵션이 없으며, 직접 트리거를 구현해야 합니다.

글로벌 윈도우 할당자는 같은 글로벌 윈도우에 모든 이벤트를 할당합니다. 커스텀 트리거를 사용하여 커스텀 윈도우를 수행하려고 할 때만 유용합니다.

Window Functions

당신은 윈도우의 내용물을 어떻게 처리할 것인지에 대한 세 가지 기본적인 옵션이 있습니다.

  1. 윈도우의 내용물과 Iterable을 넘기는 ProcessWindowFunction를 사용하여 배치처리
  2. 각 이벤트가 윈도우에 할당될 때 마다 ReduceFunction이나 AggregateFunction를 호출하여 점진적으로 처리
  3. 둘의 조합으로, 윈도우가 트리거될 때 ReduceFunction이나 AggregateFunction의 집계 전 결과가 ProcessWindowFunction에 제공
  1. as a batch, using a ProcessWindowFunction that will be passed an Iterable with the window’s contents;
  2. incrementally, with a ReduceFunction or an AggregateFunction that is called as each event is assigned to the window;
  3. or with a combination of the two, wherein the pre-aggregated results of a ReduceFunction or an AggregateFunction are supplied to a ProcessWindowFunction when the window is triggered.

1부터 3까지의 방법의 예시가 아래에 있습니다.
각 구현은 각 센서에서 발생한 1분 단위 이벤트 시간 윈도우의 피크값을 찾고있으며, 키값과 윈도우가 끝나는 시간, 최대값을 포함한 튜플을 스트림으로 생성합니다.

ProcessWindowFunction Example

DataStream<SensorReading> input = ...

input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new MyWastefulMax());

public static class MyWastefulMax extends ProcessWindowFunction<
        SensorReading,                  // input type
        Tuple3<String, Long, Integer>,  // output type
        String,                         // key type
        TimeWindow> {                   // window type
    
    @Override
    public void process(
            String key,
            Context context, 
            Iterable<SensorReading> events,
            Collector<Tuple3<String, Long, Integer>> out) {

        int max = 0;
        for (SensorReading event : events) {
            max = Math.max(event.value, max);
        }
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

해당 구현에서 기억해야 할 몇 가지가 있습니다.

  • 윈도우에 할당되는 모든 이벤트는 윈도우가 실행될때 까지 키 기반 플링크 상태에 저장되어야 합니다. 이 작업은 잠재적으로 꽤나 자원이 많이 소모됩니다.
  • ProcessWindowFunction은 윈도우 정보를 포함하고 있는 Context 객체를 전달합니다. 해당 객체의 인터페이스는 아래와 같습니다.
public abstract class Context implements java.io.Serializable {
    public abstract W window();
    
    public abstract long currentProcessingTime();
    public abstract long currentWatermark();

    public abstract KeyedStateStore windowState();
    public abstract KeyedStateStore globalState();
}

windowStateglobalState는 해당 키의 모든 윈도우에 대한 키별, 윈도우 별 또는 전역 정보를 저장할 수 있는 위치입니다.
이것은 꽤나 유용합니다. 예를 들어 당신이 현재 윈도우에 대한 무언가를 기록하고 이것을 다음 윈도우를 처리할 때 사용할 수 있습니다.

Incremental Aggregation Example

DataStream<SensorReading> input = ...

input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(new MyReducingMax(), new MyWindowFunction());

private static class MyReducingMax implements ReduceFunction<SensorReading> {
    public SensorReading reduce(SensorReading r1, SensorReading r2) {
        return r1.value() > r2.value() ? r1 : r2;
    }
}

private static class MyWindowFunction extends ProcessWindowFunction<
    SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

    @Override
    public void process(
            String key,
            Context context,
            Iterable<SensorReading> maxReading,
            Collector<Tuple3<String, Long, SensorReading>> out) {

        SensorReading max = maxReading.iterator().next();
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

Iterable<SensorReading>에는 MyReducingMax에서 계산한 사전 집계 최대값인 정확히 하나의 판독값이 포함됩니다.

Late Events

기본적으로, 이벤트 시간 윈도우를 사용하면 지연 이벤트는 제거됩니다. 여기에는 이것을 다룰 수 있도록 윈도우 API의 두 가지 옵션이 있습니다.

Side Outputs라고 불리는 방법으로 삭제될 이벤트가 대신 대체 출력 스트림으로 수집되도록 정렬할 수 있습니다.

OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream.
    .keyBy(...)
    .window(...)
    .sideOutputLateData(lateTag)
    .process(...);
  
DataStream<Event> lateStream = result.getSideOutput(lateTag);

또한 지연된 이벤트가 적절한 윈도우에 계속 할당되는 허용된 지연 간격을 지정할 수 있습니다. 기본적으로는 각각의 지연된 이벤트는 윈도우 함수를 다시 호출합니다.

기본적으로 허가된 지연은 0입니다. 즉, 워터마크 뒤의 요소들은 제거될 것입니다.(아니면 side output으로 전달됩니다.)

stream.
    .keyBy(...)
    .window(...)
    .allowedLateness(Time.seconds(10))
    .process(...);

만약 지연 허가가 0보다 크다면, 정말 늦어서 제거될 이벤트들만 side output으로 전달됩니다.

Surprises

몇 가지 측면에서 플링크의 윈도우 API는 행동이 명확하지 않을 수 있습니다. 주기적으로 질문하는 내용들 중 몇 가지를 다뤄보겠습니다. 예를 들어, 만약 당신이 24시간 길이의 윈도우를 15분마다 생성한다면 각 이벤트는 4 * 24 = 96개의 이벤트에 복사 될 것입니다.

Sliding Windows Make Copies

슬라이딩 윈도우 할당자들은 아주 많은 윈도우 객체를 생성할 수 있으며, 관련된 모든 윈도우에 각각의 이벤트를 복사할 것입니다.

Time Windows are Aligned to the Epoch

한 시간 동안의 처리 시간 창을 사용하고 12:05에 애플리케이션 실행을 시작한다고 해서 첫 번째 창이 1:05에 닫히는 것은 아닙니다.
첫 번째 창은 55분 길이로 1시에 닫힙니다.

그러나, 텀블링 및 슬라이딩 윈도우 할당자는 창의 정렬을 변경하는데 사용할 수 있는 optional offset parameter를 가지고 있습니다.

Windows Can Follow Windows

예를 들어 아래 예시와 같이 동작할 수 있습니다.

stream
    .keyBy(t -> t.key)
    .window(<window assigner>)
    .reduce(<reduce function>)
    .windowAll(<same window assigner>)
    .reduce(<same reduce function>);

당신은 플링크의 런타임은 위 동작을 병렬적으로 사전 집계를 수행하기에 충분히 똑똑하다고 기대하겠지만, 아닙니다.
이것이 작동하는 이유는 시간 윈도우에서 생성된 이벤트에 윈도우 끝의 시간을 기준으로 타임스탬프가 할당되기 때문입니다.

예를 들어, 한 시간 동안 생성되는 모든 이벤트들은 한 시간의 끝을 표시하는 타임 스탬프가 있습니다.

No Results for Empty TimeWindows

윈도우들은 이벤트들이 할당됐을 때 생성됩니다. 그러므로 주어진 시간에 생성된 이벤트가 없다면 결과는 생성되지 않습니다.

Late Events Can Cause Late Merges

세션 윈도우들은 합쳐질 수 있는 윈도우의 추상화를 기반으로 합니다. 각 요소는 새로운 윈도우에 할당되고, 윈도우간의 간격이 충분히 작다면 합쳐집니다. 이 방법에서는 지연된 이벤트가 이전에 분리됐던 세션을 잇는 다리가 되어 지연된 병합을 발생시킬 수 있습니다.

Reference

Streaming Analytics - Window

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment