Skip to content

Instantly share code, notes, and snippets.

@nfo
Created January 16, 2017 22:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save nfo/54d5830720e163d2e7e848b6e4baac20 to your computer and use it in GitHub Desktop.
Save nfo/54d5830720e163d2e7e848b6e4baac20 to your computer and use it in GitHub Desktop.
Kafka Streams - Custom timestamp extractor, from a `long` field named "timestamp"
/**
* Handle records with a timestamp in their Avro value.
* Expects a LONG field named "timestamp".
* Any problem makes this extractor return the record's internal timestamp.
*/
public class InValueTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record) {
if (record != null && record.value() != null) {
// Is it an Avro record ?
if (record.value() instanceof GenericRecord) {
GenericRecord value = (GenericRecord) record.value();
// Does it have a timestamp field, and is the field a LONG ?
Schema.Field field = value.getSchema().getField("timestamp");
if (field != null && field.schema().getType().equals(Schema.Type.LONG)) {
// Get the timestamp from the record value
return (long) value.get(field.pos());
}
}
}
return record.timestamp();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment