Created
May 30, 2022 18:45
-
-
Save Danny02/fe5fe620a8f48e84d37c04070594d7db to your computer and use it in GitHub Desktop.
commandhandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.streams.KeyValue; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.kstream.Transformer; | |
import org.apache.kafka.streams.kstream.TransformerSupplier; | |
import org.apache.kafka.streams.processor.ProcessorContext; | |
import org.apache.kafka.streams.state.KeyValueStore; | |
import org.apache.kafka.streams.state.StoreBuilder; | |
import org.apache.kafka.streams.state.Stores; | |
import java.util.List; | |
import java.util.Set; | |
import java.util.UUID; | |
import static java.lang.Boolean.TRUE; | |
public class RentCommandHandler implements TransformerSupplier< | |
RentCommandHandler.Vin, | |
KeyValue<UUID, RentCommandHandler.RentCommand>, | |
KeyValue<UUID, RentCommandHandler.HandleResult> | |
> { | |
record Vin(String vin7) { | |
} | |
record VehicleIsRentedOut() implements VehicleEvent { | |
} | |
record RentCommand(Vin vehicleId) { | |
} | |
record HandleResult(RentCommandResult result, List<KeyValue<Vin, VehicleEvent>> events) {} | |
enum RentCommandResult { | |
SUCCEEDED, FAILED; | |
} | |
private static final String RENTED_CARS_STORE = "RENTED_CARS_STORE"; | |
@Override | |
public Transformer<Vin, KeyValue<UUID, RentCommand>, KeyValue<UUID, HandleResult>> get() { | |
return new Transformer<>() { | |
private KeyValueStore<Vin, Boolean> rentedCars; | |
@Override | |
public void init(ProcessorContext context) { | |
rentedCars = context.getStateStore(RENTED_CARS_STORE); | |
} | |
@Override | |
public KeyValue<UUID, HandleResult> transform(Vin key, KeyValue<UUID, RentCommand> command) { | |
var isRented = rentedCars.get(key); | |
if(TRUE.equals(isRented)) { | |
return new KeyValue<>(command.key, new HandleResult(FAILED, List.of())); | |
} else { | |
rentedCars.put(command.value.vehicleId(), true); | |
var event = new KeyValue<Vin, VehicleEvent>(command.value.vehicleId(), new VehicleIsRentedOut()); | |
return new KeyValue<>(command.key, new HandleResult(SUCCEEDED, List.of(event))); | |
} | |
} | |
@Override | |
public void close() { | |
} | |
}; | |
} | |
@Override | |
public Set<StoreBuilder<?>> stores() { | |
var builder = Stores.keyValueStoreBuilder( | |
Stores.persistentKeyValueStore(RENTED_CARS_STORE), | |
new JsonSerde<>(), new JsonSerde<>() | |
); | |
return Set.of(builder); | |
} | |
public static void main(String[] args) { | |
var builder = new StreamsBuilder(); | |
var results = builder.<UUID, RentCommand>stream("rent-commands") | |
.map((uuid, command) -> new KeyValue<>(command.vehicleId, new KeyValue(uuid, command))) | |
.transform(new RentCommandHandler()); | |
results.mapValues(r -> r.result).to("rent-command-results"); | |
results.flatMap((k, v) -> v.events).to("vehicles"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment