Skip to content

Instantly share code, notes, and snippets.

@allenxwang
Last active August 30, 2019 22:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save allenxwang/5378acd3c34bd185ec0f72f35975c35e to your computer and use it in GitHub Desktop.
Save allenxwang/5378acd3c34bd185ec0f72f35975c35e to your computer and use it in GitHub Desktop.
Stream processing skeleton code for tracing
SingleOutputStreamOperator<Record<TraceContext>> sourceStream =
getSourceBuilder().fromKafka("traces").withOutputType(TraceContext.class).build();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
sourceStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Record<TraceContext>>(Time.seconds(30)) {
@Override
public long extractTimestamp(Record<TraceContext> element) {
return element.getAttributes(KeystoneAttributes.class).get().getTimestamp();
}
}
)
.map(r -> new TraceContextRecord(r))
.keyBy((KeySelector<TraceContextRecord, String>) value -> value.getTraceContext().getTraceId())
.window(GlobalWindows.create())
.allowedLateness(Time.seconds(TracingTrigger.MAX_TIMEOUT.get()))
.trigger(createTraceTrigger())
.process(new ProcessWindowFunction<TraceContextRecord, Record<Map<String, Object>>, String, GlobalWindow>() {
@Override
public void process(String s, Context context, Iterable<TraceContextRecord> elements,
Collector<Record<Map<String, Object>>> out) {
out.collect(createMessageLossRecord(s, elements));
}
})
.addSink(getSinkBuilder().toKafka("lost_messages").build());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment