Skip to content

Instantly share code, notes, and snippets.

@Danny02
Created May 30, 2022 18:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Danny02/fe5fe620a8f48e84d37c04070594d7db to your computer and use it in GitHub Desktop.
Save Danny02/fe5fe620a8f48e84d37c04070594d7db to your computer and use it in GitHub Desktop.
commandhandler.java
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import static java.lang.Boolean.TRUE;
public class RentCommandHandler implements TransformerSupplier<
RentCommandHandler.Vin,
KeyValue<UUID, RentCommandHandler.RentCommand>,
KeyValue<UUID, RentCommandHandler.HandleResult>
> {
record Vin(String vin7) {
}
record VehicleIsRentedOut() implements VehicleEvent {
}
record RentCommand(Vin vehicleId) {
}
record HandleResult(RentCommandResult result, List<KeyValue<Vin, VehicleEvent>> events) {}
enum RentCommandResult {
SUCCEEDED, FAILED;
}
private static final String RENTED_CARS_STORE = "RENTED_CARS_STORE";
@Override
public Transformer<Vin, KeyValue<UUID, RentCommand>, KeyValue<UUID, HandleResult>> get() {
return new Transformer<>() {
private KeyValueStore<Vin, Boolean> rentedCars;
@Override
public void init(ProcessorContext context) {
rentedCars = context.getStateStore(RENTED_CARS_STORE);
}
@Override
public KeyValue<UUID, HandleResult> transform(Vin key, KeyValue<UUID, RentCommand> command) {
var isRented = rentedCars.get(key);
if(TRUE.equals(isRented)) {
return new KeyValue<>(command.key, new HandleResult(FAILED, List.of()));
} else {
rentedCars.put(command.value.vehicleId(), true);
var event = new KeyValue<Vin, VehicleEvent>(command.value.vehicleId(), new VehicleIsRentedOut());
return new KeyValue<>(command.key, new HandleResult(SUCCEEDED, List.of(event)));
}
}
@Override
public void close() {
}
};
}
@Override
public Set<StoreBuilder<?>> stores() {
var builder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(RENTED_CARS_STORE),
new JsonSerde<>(), new JsonSerde<>()
);
return Set.of(builder);
}
public static void main(String[] args) {
var builder = new StreamsBuilder();
var results = builder.<UUID, RentCommand>stream("rent-commands")
.map((uuid, command) -> new KeyValue<>(command.vehicleId, new KeyValue(uuid, command)))
.transform(new RentCommandHandler());
results.mapValues(r -> r.result).to("rent-command-results");
results.flatMap((k, v) -> v.events).to("vehicles");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment