Skip to content

Instantly share code, notes, and snippets.

@birdayz
Created March 26, 2020 14:46
Show Gist options
  • Save birdayz/6b47a9d413cc9151517fdb92c0d876cd to your computer and use it in GitHub Desktop.
Save birdayz/6b47a9d413cc9151517fdb92c0d876cd to your computer and use it in GitHub Desktop.
package com.eon.iotcore.datapoint.compute;
import com.eon.iotcore.datapoint.DatapointChangedEvent;
import java.time.Instant;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
// Extracts the embedded timestamp of a record (giving you "event-time" semantics).
public class MyTimestampExtractor implements TimestampExtractor {
public MyTimestampExtractor() {
}
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
System.out.println("EXTRACT");
if (record.value() instanceof MyType) {
.....
return i.toEpochMilli();
}
// Invalid timestamp! Attempt to estimate a new timestamp,
// otherwise fall back to wall-clock time (processing-time).
if (previousTimestamp >= 0) {
return previousTimestamp;
} else {
return System.currentTimeMillis();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment