DataStream<String> adhStream = env.addSource(eventSource)
				.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(60))
						.withTimestampAssigner((o, l) -> {
							try
							{
								JsonNode event = mapper.readTree(o);
								return ZonedDateTime.parse(
												event.get("creation").get("dateTime").asText() ,
												DateTimeFormatter.ISO_OFFSET_DATE_TIME
										)
										.toInstant()
										.toEpochMilli();
							}
							catch (JsonProcessingException e)
							{
								e.printStackTrace();
							}
							return 1000;
						})
				);

		SingleOutputStreamOperator<MyOutput> outputStream = adhStream
				.keyBy(x -> {
					try
					{
						JsonNode event = mapper.readTree(x);
						return event.get("someKey").asText();
					}
					catch (Exception e)
					{
						System.out.println("Cannot keyby");
						return "";
					}
				}).window(TumblingEventTimeWindows.of(Time.seconds(300)))
				.allowedLateness(Time.seconds(120))
				.process(new ProcessWindow());

		outputStream.addSink(mySink);