Skip to content

Instantly share code, notes, and snippets.

@nfo
Last active January 16, 2017 22:54
Show Gist options
  • Save nfo/6df4d1076af9da5fd1c29b0ad4564f2a to your computer and use it in GitHub Desktop.
Save nfo/6df4d1076af9da5fd1c29b0ad4564f2a to your computer and use it in GitHub Desktop.
Kafka Streams - Handling records with same ID with updated metrics.
// This custom timestamp extractor builds the timestamp from the record's "internal" timestamp,
// minus the time spent since the delivery of the first version of the record.
// This allows keeping records with the same ID in the same windows.
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class);
// `TestKey` contains the record ID.
// `TestValue` contains the incremented version number `v`, the time spent since the first
// version of the record was sent `tsDiff`, and a metric value `metric`.
KStream<TestKey, TestValue>stream = builder.stream(INPUT_TOPIC_NAME);
// Generate windows of KTables, and using `reduce` to keep only the last version of a record.
KTable<Windowed<TestKey>, TestValue> tables = stream.groupByKey().reduce(
(v1, v2) -> {
return v1.getV() > v2.getV() ? v1 : v2;
},
TimeWindows.of(1000).until(60000),
"reducer"
);
KTable aggregates = tables
// No change needed to the key
.groupBy((k, v) -> {
return new KeyValue<>(k, v);
}, windowedKeySerde, testValueSerde)
// Simple aggregation, summing the metric value
.aggregate(
() -> 0L,
(k, v, a) -> a + v.getMetric(),
(k, v, a) -> a - v.getMetric(),
longSerde,
"aggregate"
);
aggregates.print();
public class TestTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record) {
// Handle the records sent periodically with updated metrics.
// We set their timestamp to the timestamp of the first version beacon.
// For that, we use the given field ts_diff computed by the browser.
if (record.value() instanceof TestValue) {
TestValue value = (TestValue) record.value();
if (value != null && value.getTsDiff() > 0) {
return record.timestamp() - value.getTsDiff();
}
}
return record.timestamp();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment