The code below could be invalid, it's being discussed on the Kafka mailing list here: http://mail-archives.apache.org/mod_mbox/kafka-users/201701.mbox/%3cCABQKjkKMwQJKzzvJbFS6jHigmQgJ675aMhsqiRptmasf4kZF4A@mail.gmail.com%3e
Last active
January 16, 2017 22:54
-
-
Save nfo/6df4d1076af9da5fd1c29b0ad4564f2a to your computer and use it in GitHub Desktop.
Kafka Streams - Handling records with same ID with updated metrics.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This custom timestamp extractor builds the timestamp from the record's "internal" timestamp, | |
// minus the time spent since the delivery of the first version of the record. | |
// This allows keeping records with the same ID in the same windows. | |
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class); | |
// `TestKey` contains the record ID. | |
// `TestValue` contains the incremented version number `v`, the time spent since the first | |
// version of the record was sent `tsDiff`, and a metric value `metric`. | |
KStream<TestKey, TestValue>stream = builder.stream(INPUT_TOPIC_NAME); | |
// Generate windows of KTables, and using `reduce` to keep only the last version of a record. | |
KTable<Windowed<TestKey>, TestValue> tables = stream.groupByKey().reduce( | |
(v1, v2) -> { | |
return v1.getV() > v2.getV() ? v1 : v2; | |
}, | |
TimeWindows.of(1000).until(60000), | |
"reducer" | |
); | |
KTable aggregates = tables | |
// No change needed to the key | |
.groupBy((k, v) -> { | |
return new KeyValue<>(k, v); | |
}, windowedKeySerde, testValueSerde) | |
// Simple aggregation, summing the metric value | |
.aggregate( | |
() -> 0L, | |
(k, v, a) -> a + v.getMetric(), | |
(k, v, a) -> a - v.getMetric(), | |
longSerde, | |
"aggregate" | |
); | |
aggregates.print(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TestTimestampExtractor implements TimestampExtractor { | |
@Override | |
public long extract(ConsumerRecord<Object, Object> record) { | |
// Handle the records sent periodically with updated metrics. | |
// We set their timestamp to the timestamp of the first version beacon. | |
// For that, we use the given field ts_diff computed by the browser. | |
if (record.value() instanceof TestValue) { | |
TestValue value = (TestValue) record.value(); | |
if (value != null && value.getTsDiff() > 0) { | |
return record.timestamp() - value.getTsDiff(); | |
} | |
} | |
return record.timestamp(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment