DataStream<String> adhStream = env.addSource(eventSource) .assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(60)) .withTimestampAssigner((o, l) -> { try { JsonNode event = mapper.readTree(o); return ZonedDateTime.parse( event.get("creation").get("dateTime").asText() , DateTimeFormatter.ISO_OFFSET_DATE_TIME ) .toInstant() .toEpochMilli(); } catch (JsonProcessingException e) { e.printStackTrace(); } return 1000; }) ); SingleOutputStreamOperator<MyOutput> outputStream = adhStream .keyBy(x -> { try { JsonNode event = mapper.readTree(x); return event.get("someKey").asText(); } catch (Exception e) { System.out.println("Cannot keyby"); return ""; } }).window(TumblingEventTimeWindows.of(Time.seconds(300))) .allowedLateness(Time.seconds(120)) .process(new ProcessWindow()); outputStream.addSink(mySink);