Skip to content

Instantly share code, notes, and snippets.

@nfo
Created January 16, 2017 18:41
Show Gist options
  • Save nfo/eaf350afb5667a3516593da4d48e757a to your computer and use it in GitHub Desktop.
Save nfo/eaf350afb5667a3516593da4d48e757a to your computer and use it in GitHub Desktop.
Kafka Streams - A serde for timed windows (start + end)
public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
private static final int TIMESTAMP_SIZE = 8;
private Deserializer<T> inner;
public TimeWindowedDeserializer(Deserializer<T> inner) {
this.inner = inner;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
}
@Override
public Windowed<T> deserialize(String topic, byte[] data) {
// Read the inner data
byte[] bytes = new byte[data.length - TIMESTAMP_SIZE - TIMESTAMP_SIZE];
System.arraycopy(data, 0, bytes, 0, bytes.length);
// Read the start timestamp
byte[] startBytes = new byte[TIMESTAMP_SIZE];
System.arraycopy(data, data.length - TIMESTAMP_SIZE - TIMESTAMP_SIZE, startBytes, 0, startBytes.length);
long start = ByteBuffer.wrap(startBytes).getLong(0);
// Read the end timestamp
byte[] endBytes = new byte[TIMESTAMP_SIZE];
System.arraycopy(data, data.length - TIMESTAMP_SIZE, endBytes, 0, endBytes.length);
long end = ByteBuffer.wrap(endBytes).getLong(0);
// Read as a `TimeWindow`.
// An `UnlimitedWindow` is just a window with an end equal to `Long.MAX_VALUE`.
// And consumer code should should only use the superclass `Window`.
return new Windowed<T>(inner.deserialize(topic, bytes), new TimeWindow(start, end));
}
@Override
public void close() {
inner.close();
}
}
/**
* Serde for instances of `Window` subclasses, as long as they do not contain other instance variables that `start` and `end`.
* It always deserializes as a `TimeWindow`. An `UnlimitedWindow` is a window with `end` = Long.MAX_VALUE.
*
* We do not use {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} because it does not serialize
* the window `end`, only the window `start`.
*
* @param <T>
*/
public class TimeWindowedSerde<T> implements Serde<Windowed<T>> {
private final Serde<Windowed<T>> inner;
public TimeWindowedSerde(Serde<T> serde) {
inner = Serdes.serdeFrom(
new TimeWindowedSerializer<>(serde.serializer()),
new TimeWindowedDeserializer<>(serde.deserializer()));
}
@Override
public Serializer<Windowed<T>> serializer() {
return inner.serializer();
}
@Override
public Deserializer<Windowed<T>> deserializer() {
return inner.deserializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}
public class TimeWindowedSerializer<T> implements Serializer<Windowed<T>> {
private static final int TIMESTAMP_SIZE = 8;
private Serializer<T> inner;
public TimeWindowedSerializer(Serializer<T> inner) {
this.inner = inner;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
}
@Override
public byte[] serialize(String topic, Windowed<T> data) {
byte[] serializedKey = inner.serialize(topic, data.key());
ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + TIMESTAMP_SIZE);
buf.put(serializedKey);
buf.putLong(data.window().start());
buf.putLong(data.window().end());
return buf.array();
}
@Override
public void close() {
inner.close();
}
public byte[] serializeBaseKey(String topic, Windowed<T> data) {
return inner.serialize(topic, data.key());
}
}
@zetsuno
Copy link

zetsuno commented May 14, 2019

I'm having trouble compiling this due to the TimeWindow class (line 36 of Deserializer). What class is it from?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment