Skip to content

Instantly share code, notes, and snippets.

@rodesai
Created October 23, 2023 05:59
Show Gist options
  • Save rodesai/56663a8c517127f89d7cfd2f4a238833 to your computer and use it in GitHub Desktop.
Save rodesai/56663a8c517127f89d7cfd2f4a238833 to your computer and use it in GitHub Desktop.
Responsive Sizing Example
package dev.responsive.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.util.concurrent.RateLimiter;
import dev.responsive.example.Main2.Event.Type;
import dev.responsive.example.Main2.OrderProgress.Status;
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.rocksdb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Main2 {
public static final String ORDERS_TOPIC = "orders";
public static final String PROGRESS_TOPIC = "progress";
public static final String OUTPUT_TOPIC = "output";
private static Integer parseEnv(final String name) {
final var value = System.getenv(name);
if (value == null || value.equals("")) {
return null;
}
return Integer.parseInt(value);
}
private static String parseEnv(final String name, final String defaultValue) {
final var value = System.getenv(name);
if (value == null || value.equals("")) {
return defaultValue;
}
return value;
}
private static Integer parseEnv(final String name, int defaultValue) {
Integer boxed = parseEnv(name);
if (boxed == null) {
return defaultValue;
}
return boxed;
}
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
public static void main(final String[] args) throws Exception {
final Properties props = loadConfig();
props.put("sasl.jaas.config", System.getenv("SASL_JAAS_CONFIG"));
final Integer eps = parseEnv("STREAMS_EPS");
final String recordingLevel = parseEnv("RECORDING_LEVEL", "INFO");
final Admin admin = Admin.create(props);
try {
admin.createTopics(List.of(
new NewTopic(ORDERS_TOPIC, Optional.of(32), Optional.empty()),
new NewTopic(PROGRESS_TOPIC, Optional.of(32), Optional.empty()),
new NewTopic(OUTPUT_TOPIC, Optional.of(2), Optional.empty())
));
} catch (final UnknownTopicOrPartitionException ignored) {
}
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final KafkaStreams streams;
final Map<String, Object> config = new HashMap<>();
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, recordingLevel);
props.forEach((k, v) -> config.put((String) k, v));
final Topology topology = topology(eps);
streams = new ResponsiveKafkaStreams(topology, config);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
executorService.shutdown();
if (streams != null) {
streams.close();
}
}));
streams.start();
}
static Topology topology(Integer eps) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, Order> orders = builder.stream(
List.of(ORDERS_TOPIC),
Consumed.with(
new Serdes.StringSerde(),
Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Order.class))
)
);
final KStream<String, Arrival> progress = builder.stream(
List.of(PROGRESS_TOPIC),
Consumed.with(
new Serdes.StringSerde(),
Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Arrival.class))
)
);
final RateLimiter limiter = RateLimiter.create(eps == null ? Integer.MAX_VALUE : eps.doubleValue());
final KStream<String, Event> merged = orders.mapValues(o -> new Event(Type.ORDER, o, null))
.merge(progress
.selectKey((k ,v) -> v.orderId())
.mapValues(p -> new Event(Type.ARRIVAL, null, p)));
final KTable<String, OrderProgress> aggregated = merged
.mapValues(v -> {
limiter.acquire();
return v;
})
.groupByKey(Grouped.with(
new Serdes.StringSerde(),
Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Event.class))))
.aggregate(
() -> new OrderProgress(Status.ORDERED, null, null),
(k, event, orderProgress) -> {
if (event.type().equals(Type.ARRIVAL.ORDER)) {
return new OrderProgress(
orderProgress.status(),
event.order(),
orderProgress.lastLocation()
);
} else {
return new OrderProgress(
event.arrival().finalDestination ? Status.COMPLETED : Status.IN_TRANSIT,
orderProgress.order(),
event.arrival()
);
}
},
Materialized.with(
new Serdes.StringSerde(),
Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(OrderProgress.class))
));
return builder.build();
}
static Properties loadConfig() throws IOException {
final Properties cfg = new Properties();
try (InputStream inputStream = Main.class.getResourceAsStream("/app.properties")) {
cfg.load(inputStream);
}
return cfg;
}
record OrderProgress(
Status status,
Order order,
Arrival lastLocation
) {
enum Status {
ORDERED,
IN_TRANSIT,
COMPLETED
}
}
record Event(
Type type,
Order order,
Arrival arrival
) {
enum Type {
ORDER,
ARRIVAL
}
}
record Order(
String id,
List<String> items,
String userId,
String userEmail,
Instant time,
String metadata
) { }
record Arrival(
String location,
String orderId,
Instant time,
boolean finalDestination
) { }
static class JsonDeserializer<T> implements Deserializer<T> {
private final Class<T> clazz;
JsonDeserializer(final Class<T> clazz) {
this.clazz = clazz;
}
@Override
public T deserialize(final String topic, final byte[] data) {
try {
return MAPPER.readValue(data, clazz);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
public static class JsonSerializer<T> implements Serializer<T> {
public JsonSerializer() {
super();
}
@Override
public byte[] serialize(final String topic, final T data) {
try {
return MAPPER.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
private static final ObjectMapper MAPPER = new ObjectMapper();
static {
MAPPER.registerModule(new JavaTimeModule());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment