Last active
December 6, 2019 21:34
-
-
Save derjust/0649d497e3ab08d1e2ce53d362217edd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package de.zeeman.welp; | |
import java.util.List; | |
import com.google.common.collect.ImmutableList; | |
import org.apache.flink.api.common.functions.ReduceFunction; | |
import org.apache.flink.fs.s3base.shaded.org.joda.time.DateTime; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; | |
import org.apache.flink.streaming.api.functions.ProcessFunction; | |
import org.apache.flink.streaming.api.functions.sink.SinkFunction; | |
import org.apache.flink.streaming.api.watermark.Watermark; | |
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; | |
import org.apache.flink.util.Collector; | |
public class Test { | |
static class Event { | |
private final long id; | |
private final long lastModifiedDate; | |
private Event(long id, long lastModifiedDate) { | |
this.id = id; | |
this.lastModifiedDate= lastModifiedDate; | |
} | |
static Event of(long id, long lastModifiedDate) { | |
return new Event(id, lastModifiedDate); | |
} | |
public String toString() { | |
return this.id + " (" + this.lastModifiedDate / 1000 % 10000+ ") "; | |
} | |
} | |
public static void main(String... args) throws Exception { | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
AssignerWithPunctuatedWatermarks<Event> tsWatermark = new AssignerWithPunctuatedWatermarks<Event>(){ | |
public long extractTimestamp(Event element, long previousElementTimestamp) { | |
if (element.lastModifiedDate > previousElementTimestamp) { | |
return element.lastModifiedDate; | |
} else { | |
return previousElementTimestamp; | |
} | |
} | |
public Watermark checkAndGetNextWatermark(Event lastElement, long extractedTimestamp) { | |
// as we use ProcessTime no watermarking is needed? | |
return null; | |
} | |
}; | |
DateTime now = DateTime.now(); | |
List<Event> kafkaEvents = ImmutableList.of( | |
Event.of(1, now.minusSeconds(4).getMillis()), | |
Event.of(2, now.minusSeconds(3).getMillis()), | |
Event.of(1, now.minusSeconds(2).getMillis()), | |
Event.of(1, now.minusSeconds(1).getMillis()) | |
); | |
// get input data by connecting to the socket | |
DataStream<Event> kafka = env.fromCollection(kafkaEvents) | |
.assignTimestampsAndWatermarks(tsWatermark) | |
.name("Kafka"); | |
DataStream<Event> joinedStream = kafka | |
.assignTimestampsAndWatermarks(tsWatermark); | |
SingleOutputStreamOperator<Event> stream = joinedStream.keyBy(k -> k.id) | |
// Aggregate all events in the last 5 seconds | |
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) | |
// Let the time be constantly progress | |
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) | |
.reduce((ReduceFunction<Event>) (value1, value2) -> { | |
Event result = value1.lastModifiedDate > value2.lastModifiedDate ? value1 : value2; | |
System.out.println("Reducing " + value1 + " and " + value2 + " to " + result); | |
return result; | |
}).name("Find latest event"); | |
stream = stream | |
.process(new ProcessFunction<Event, Event>() { | |
public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception { | |
System.out.println("Found event: " + value); | |
out.collect(value); | |
} | |
}).name("Map"); | |
stream.addSink(new SinkFunction<Event>() { | |
public void invoke(Event value, Context context) { | |
System.out.println(System.currentTimeMillis() + " " + value); | |
} | |
}).name("Sink"); | |
env.execute("Test"); | |
// Keep main running as we run it straight from the IDE | |
Thread.sleep(60 * 1000); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment