Created
October 23, 2023 06:00
-
-
Save rodesai/8929e5633db518f4a929cee213ceff4c to your computer and use it in GitHub Desktop.
Responsive Sizing Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.responsive.example; | |
import com.fasterxml.jackson.core.JsonProcessingException; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; | |
import com.google.common.util.concurrent.RateLimiter; | |
import dev.responsive.example.Main2.Event.Type; | |
import dev.responsive.example.Main2.OrderProgress.Status; | |
import dev.responsive.kafka.api.ResponsiveKafkaStreams; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.time.Instant; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Optional; | |
import java.util.Properties; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import org.apache.kafka.clients.admin.Admin; | |
import org.apache.kafka.clients.admin.NewTopic; | |
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; | |
import org.apache.kafka.common.serialization.Deserializer; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.common.serialization.Serializer; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.Topology; | |
import org.apache.kafka.streams.kstream.Consumed; | |
import org.apache.kafka.streams.kstream.Grouped; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.kstream.KTable; | |
import org.apache.kafka.streams.kstream.Materialized; | |
import org.rocksdb.Options; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class Main { | |
public static final String ORDERS_TOPIC = "orders"; | |
public static final String PROGRESS_TOPIC = "progress"; | |
public static final String OUTPUT_TOPIC = "output"; | |
private static Integer parseEnv(final String name) { | |
final var value = System.getenv(name); | |
if (value == null || value.equals("")) { | |
return null; | |
} | |
return Integer.parseInt(value); | |
} | |
private static String parseEnv(final String name, final String defaultValue) { | |
final var value = System.getenv(name); | |
if (value == null || value.equals("")) { | |
return defaultValue; | |
} | |
return value; | |
} | |
private static Integer parseEnv(final String name, int defaultValue) { | |
Integer boxed = parseEnv(name); | |
if (boxed == null) { | |
return defaultValue; | |
} | |
return boxed; | |
} | |
private static final Logger LOG = LoggerFactory.getLogger(Main.class); | |
public static void main(final String[] args) throws Exception { | |
final Properties props = loadConfig(); | |
props.put("sasl.jaas.config", System.getenv("SASL_JAAS_CONFIG")); | |
final Integer eps = parseEnv("STREAMS_EPS"); | |
final String recordingLevel = parseEnv("RECORDING_LEVEL", "INFO"); | |
final Admin admin = Admin.create(props); | |
try { | |
admin.createTopics(List.of( | |
new NewTopic(ORDERS_TOPIC, Optional.of(32), Optional.empty()), | |
new NewTopic(PROGRESS_TOPIC, Optional.of(32), Optional.empty()), | |
new NewTopic(OUTPUT_TOPIC, Optional.of(2), Optional.empty()) | |
)); | |
} catch (final UnknownTopicOrPartitionException ignored) { | |
} | |
final ExecutorService executorService = Executors.newSingleThreadExecutor(); | |
final KafkaStreams streams; | |
final Map<String, Object> config = new HashMap<>(); | |
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, recordingLevel); | |
props.forEach((k, v) -> config.put((String) k, v)); | |
final Topology topology = topology(eps); | |
streams = new ResponsiveKafkaStreams(topology, config); | |
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | |
executorService.shutdown(); | |
if (streams != null) { | |
streams.close(); | |
} | |
})); | |
streams.start(); | |
} | |
static Topology topology(Integer eps) { | |
final StreamsBuilder builder = new StreamsBuilder(); | |
final KStream<String, Order> orders = builder.stream( | |
List.of(ORDERS_TOPIC), | |
Consumed.with( | |
new Serdes.StringSerde(), | |
Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Order.class)) | |
) | |
); | |
final KStream<String, Arrival> progress = builder.stream( | |
List.of(PROGRESS_TOPIC), | |
Consumed.with( | |
new Serdes.StringSerde(), | |
Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Arrival.class)) | |
) | |
); | |
final RateLimiter limiter = RateLimiter.create(eps == null ? Integer.MAX_VALUE : eps.doubleValue()); | |
final KStream<String, Event> merged = orders.mapValues(o -> new Event(Type.ORDER, o, null)) | |
.merge(progress | |
.selectKey((k ,v) -> v.orderId()) | |
.mapValues(p -> new Event(Type.ARRIVAL, null, p))); | |
final KTable<String, OrderProgress> aggregated = merged | |
.mapValues(v -> { | |
limiter.acquire(); | |
return v; | |
}) | |
.groupByKey(Grouped.with( | |
new Serdes.StringSerde(), | |
Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Event.class)))) | |
.aggregate( | |
() -> new OrderProgress(Status.ORDERED, null, null), | |
(k, event, orderProgress) -> { | |
if (event.type().equals(Type.ARRIVAL.ORDER)) { | |
return new OrderProgress( | |
orderProgress.status(), | |
event.order(), | |
orderProgress.lastLocation() | |
); | |
} else { | |
return new OrderProgress( | |
event.arrival().finalDestination ? Status.COMPLETED : Status.IN_TRANSIT, | |
orderProgress.order(), | |
event.arrival() | |
); | |
} | |
}, | |
Materialized.with( | |
new Serdes.StringSerde(), | |
Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(OrderProgress.class)) | |
)); | |
return builder.build(); | |
} | |
static Properties loadConfig() throws IOException { | |
final Properties cfg = new Properties(); | |
try (InputStream inputStream = Main.class.getResourceAsStream("/app.properties")) { | |
cfg.load(inputStream); | |
} | |
return cfg; | |
} | |
record OrderProgress( | |
Status status, | |
Order order, | |
Arrival lastLocation | |
) { | |
enum Status { | |
ORDERED, | |
IN_TRANSIT, | |
COMPLETED | |
} | |
} | |
record Event( | |
Type type, | |
Order order, | |
Arrival arrival | |
) { | |
enum Type { | |
ORDER, | |
ARRIVAL | |
} | |
} | |
record Order( | |
String id, | |
List<String> items, | |
String userId, | |
String userEmail, | |
Instant time, | |
String metadata | |
) { } | |
record Arrival( | |
String location, | |
String orderId, | |
Instant time, | |
boolean finalDestination | |
) { } | |
static class JsonDeserializer<T> implements Deserializer<T> { | |
private final Class<T> clazz; | |
JsonDeserializer(final Class<T> clazz) { | |
this.clazz = clazz; | |
} | |
@Override | |
public T deserialize(final String topic, final byte[] data) { | |
try { | |
return MAPPER.readValue(data, clazz); | |
} catch (final IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
public static class JsonSerializer<T> implements Serializer<T> { | |
public JsonSerializer() { | |
super(); | |
} | |
@Override | |
public byte[] serialize(final String topic, final T data) { | |
try { | |
return MAPPER.writeValueAsBytes(data); | |
} catch (JsonProcessingException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
private static final ObjectMapper MAPPER = new ObjectMapper(); | |
static { | |
MAPPER.registerModule(new JavaTimeModule()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment