Skip to content

Instantly share code, notes, and snippets.

@kbastani
Last active January 21, 2019 12:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kbastani/ec39907a43a77b88b4c65775d9f1f250 to your computer and use it in GitHub Desktop.
Save kbastani/ec39907a43a77b88b4c65775d9f1f250 to your computer and use it in GitHub Desktop.
Simpler KStream API Example: See KafkaController.java
public class Aggregate<T> implements Aggregator<Long, KafkaEvent, KafkaEvent> {
private final Class<T> clazz;
private final Aggregator<Long, T, T> aggregation;
public Aggregate(Class<T> clazz, Aggregator<Long, T, T> aggregation) {
this.clazz = clazz;
this.aggregation = aggregation;
}
@Override
public KafkaEvent apply(Long aggKey, KafkaEvent value, KafkaEvent aggregate) {
return KafkaEvent.withPayload(aggregation.apply(aggKey, value.to(clazz), aggregate.to(clazz)));
}
}
public class Filter<T> implements Predicate<Long, KafkaEvent> {
private final Class<T> clazz;
private final Function<T, Boolean> condition;
public Filter(Class<T> clazz, Function<T, Boolean> condition) {
this.clazz = clazz;
this.condition = condition;
}
@Override
public boolean test(Long key, KafkaEvent value) {
return condition.apply(value.to(clazz));
}
}
@Component
public class KafkaController {
@Configuration
@EnableKafka
@EnableKafkaStreams
public static class KafkaStreamsConfiguration {
//... Kafka Streams Producer/Consumer Initialization ...
/**
* A simplified example of abstract command/pipe operators that wrap around KStreams API. The KafkaEvent
* class is a serializable container that standardizes the input and output of data types on KTables
* and KStreams. The StreamCommands<T> class provides wrapped pipeline operators that automatically
* deserializes and serializes the input and output of serialized entities inside the KafkaEvent
* container. The KafkaEventSerde is the standard input and output serde that embeds any serializable
* object. The entire goal here is to abstract away serialization and to make the pipe operations
* on KafkaStreams more readable.
*/
@Bean
public KStream<Long, KafkaEvent> movieStream(KStreamBuilder builder) {
KStream<Long, KafkaEvent> pipe = builder.stream(Serdes.Long(), new KafkaEventSerde(), "movie-stream");
// Wraps the functional KStreams internals and provides generic type inference and serialization
StreamCommands<DomainEvent> commands = new StreamCommands<>(DomainEvent.class);
// Imagine the possibilities...
pipe.filter(commands.filter(event -> event.getType() == DomainEventType.READY))
.map(commands.map((id, event) -> {
DomainEntity payload = event.getPayload();
if (payload != null)
payload.setState(DomainEventType.PLAYING);
return event;
})).to(Serdes.Long(), new KafkaEventSerde(), "play-stream");
// Would be even better if KStream<K,V> supported StreamCommands<T> operators
return pipe;
}
}
}
public class KafkaEvent implements Serializable {
private byte[] bytes;
private Long id;
private ObjectMapper objectMapper = new ObjectMapper();
public KafkaEvent() {
objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
public KafkaEvent(byte[] bytes, Long id) {
this.bytes = bytes;
this.id = id;
}
public KafkaEvent(byte[] bytes) {
this.bytes = bytes;
}
public KafkaEvent(Object payload) {
try {
this.bytes = new ObjectMapper().writeValueAsBytes(payload);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
public KafkaEvent(Long id, Object payload) {
this(payload);
this.id = id;
}
public byte[] getBytes() {
return bytes;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public <T> T to(Class<T> clazz) {
T result = null;
try {
if (bytes != null && bytes.length > 0)
result = objectMapper.readValue(this.bytes, clazz);
} catch (IOException e) {
throw new RuntimeException(e);
}
return result;
}
public <T> String toString(Class<T> clazz) {
String result = null;
try {
result = objectMapper.writeValueAsString(to(clazz));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
assert result != null;
return result.trim();
}
public static KafkaEvent withPayload(Object object) {
return new KafkaEvent(Long.valueOf(object.hashCode()), object);
}
public static KafkaEvent withPayload(Long key, Object object) {
return new KafkaEvent(key, object);
}
@Override
public String toString() {
String result;
if (bytes != null) {
result = ("{\"id\":" + id + ",\"message\":" + toString(Object.class) + "}").trim();
} else {
result = super.toString();
}
return result;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
KafkaEvent that = (KafkaEvent) o;
if (!Arrays.equals(bytes, that.bytes)) return false;
return id != null ? id.equals(that.id) : that.id == null;
}
@Override
public int hashCode() {
int result = Arrays.hashCode(bytes);
result = 31 * result + (id != null ? id.hashCode() : 0);
return result;
}
}
public class KafkaEventSerde implements Serde<KafkaEvent> {
private final JsonSerializer<KafkaEvent> serializer = new JsonSerializer<>();
private final JsonDeserializer<KafkaEvent> deserializer = new JsonDeserializer<>(KafkaEvent.class);
public KafkaEventSerde() {
serializer.configure(null, false);
deserializer.configure(null, false);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.serializer.configure(configs, isKey);
this.deserializer.configure(configs, isKey);
}
@Override
public void close() {
this.serializer.close();
this.deserializer.close();
}
@Override
public Serializer<KafkaEvent> serializer() {
return this.serializer;
}
@Override
public Deserializer<KafkaEvent> deserializer() {
return this.deserializer;
}
}
public class Mapper<V, R> implements KeyValueMapper<Long, KafkaEvent, KeyValue<Long, KafkaEvent>> {
private final Class<V> clazz;
private final KeyValueMapper<Long, V, R> mapper;
public Mapper(Class<V> clazz, KeyValueMapper<Long, V, R> mapper) {
this.clazz = clazz;
this.mapper = mapper;
}
@Override
public KeyValue<Long, KafkaEvent> apply(Long key, KafkaEvent value) {
return new KeyValue<>(key, KafkaEvent.withPayload(mapper.apply(key, value.to(clazz))));
}
}
public class StreamCommands<T> {
private Class<T> clazz;
public StreamCommands(Class<T> clazz) {
this.clazz = clazz;
}
public Filter<T> filter(Function<T, Boolean> predicate) {
return new Filter<>(clazz, predicate);
}
public Aggregate<T> aggregate(Aggregator<Long, T, T> aggregator) {
return new Aggregate<>(clazz, aggregator);
}
public Mapper<T, ?> map(KeyValueMapper<Long, T, ?> mapper) {
return new Mapper<>(clazz, mapper);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment