Skip to content

Instantly share code, notes, and snippets.

@derjust
Last active December 6, 2019 21:34
Show Gist options
  • Save derjust/0649d497e3ab08d1e2ce53d362217edd to your computer and use it in GitHub Desktop.
Save derjust/0649d497e3ab08d1e2ce53d362217edd to your computer and use it in GitHub Desktop.
package de.zeeman.welp;
import java.util.List;
import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.fs.s3base.shaded.org.joda.time.DateTime;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.util.Collector;
public class Test {
static class Event {
private final long id;
private final long lastModifiedDate;
private Event(long id, long lastModifiedDate) {
this.id = id;
this.lastModifiedDate= lastModifiedDate;
}
static Event of(long id, long lastModifiedDate) {
return new Event(id, lastModifiedDate);
}
public String toString() {
return this.id + " (" + this.lastModifiedDate / 1000 % 10000+ ") ";
}
}
public static void main(String... args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
AssignerWithPunctuatedWatermarks<Event> tsWatermark = new AssignerWithPunctuatedWatermarks<Event>(){
public long extractTimestamp(Event element, long previousElementTimestamp) {
if (element.lastModifiedDate > previousElementTimestamp) {
return element.lastModifiedDate;
} else {
return previousElementTimestamp;
}
}
public Watermark checkAndGetNextWatermark(Event lastElement, long extractedTimestamp) {
// as we use ProcessTime no watermarking is needed?
return null;
}
};
DateTime now = DateTime.now();
List<Event> kafkaEvents = ImmutableList.of(
Event.of(1, now.minusSeconds(4).getMillis()),
Event.of(2, now.minusSeconds(3).getMillis()),
Event.of(1, now.minusSeconds(2).getMillis()),
Event.of(1, now.minusSeconds(1).getMillis())
);
// get input data by connecting to the socket
DataStream<Event> kafka = env.fromCollection(kafkaEvents)
.assignTimestampsAndWatermarks(tsWatermark)
.name("Kafka");
DataStream<Event> joinedStream = kafka
.assignTimestampsAndWatermarks(tsWatermark);
SingleOutputStreamOperator<Event> stream = joinedStream.keyBy(k -> k.id)
// Aggregate all events in the last 5 seconds
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// Let the time be constantly progress
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.reduce((ReduceFunction<Event>) (value1, value2) -> {
Event result = value1.lastModifiedDate > value2.lastModifiedDate ? value1 : value2;
System.out.println("Reducing " + value1 + " and " + value2 + " to " + result);
return result;
}).name("Find latest event");
stream = stream
.process(new ProcessFunction<Event, Event>() {
public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
System.out.println("Found event: " + value);
out.collect(value);
}
}).name("Map");
stream.addSink(new SinkFunction<Event>() {
public void invoke(Event value, Context context) {
System.out.println(System.currentTimeMillis() + " " + value);
}
}).name("Sink");
env.execute("Test");
// Keep main running as we run it straight from the IDE
Thread.sleep(60 * 1000);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment