Skip to content

Instantly share code, notes, and snippets.

@n1ko-w1ll
Last active November 10, 2017 16:13
Show Gist options
  • Save n1ko-w1ll/af99cdddc36c419f0c5eafb2302f233f to your computer and use it in GitHub Desktop.
Save n1ko-w1ll/af99cdddc36c419f0c5eafb2302f233f to your computer and use it in GitHub Desktop.
Akka Stream Processor - destilled example out of a real world project that demonstrates how to persist the event offset together with the serialized AtLeastOnceDeliverySnapshot
import com.google.protobuf.ByteString;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.event.Logging;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery;
import akka.persistence.AtLeastOnceDelivery;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.serialization.SerializationExtension;
import akka.serialization.Serializer;
import proto.entities.Entities;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
public class StreamProcessor extends AbstractPersistentActorWithAtLeastOnceDelivery {
private static final int SNAPSHOT_INTERVAL = 1000;
private final DiagnosticLoggingAdapter log = Logging.apply(this);
public static Props props() {
return Props.create(StreamProcessor::new);
}
private final Serializer serializer;
private Entities.OffsetStateV1 state = Entities.OffsetStateV1.newBuilder().setOffset(0L).build();
private StreamProcessor() {
this.serializer = SerializationExtension.get(context().system())
.serializerFor(AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot.class);
}
@Override
public String persistenceId() {
return "StreamProcessor";
}
@Override
public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder
.match(OffsetSaved.class, ev -> state = ev.offsetState)
.match(SnapshotOffer.class, s -> {
if (s.snapshot() instanceof Entities.StreamProcessorStateV1) {
final Entities.StreamProcessorStateV1 snapshot = (Entities.StreamProcessorStateV1) s.snapshot();
final Object deliverySnapshot = serializer.fromBinary(
snapshot.getDeliveryState().toByteArray(), AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot.class);
snapshot.getOffsetState();
setDeliverySnapshot((AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot) deliverySnapshot);
}
})
.match(RecoveryCompleted.class, completed -> log.debug("Recovery completed. Offset = {}", state.getOffset()))
.matchAny(o -> log.warning("Received unknown recover message: {}", o))
.build();
}
@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder
.match(SaveOffset.class, msg -> {
persist(new OffsetSaved(Entities.OffsetStateV1.newBuilder().setOffset(msg.offset).build()), ev -> {
state = ev.offsetState;
if (lastSequenceNr() % SNAPSHOT_INTERVAL == 0 && lastSequenceNr() != 0) {
log.debug("Snapshot interval reached, about to save snapshot...");
final AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot deliverySnapshot = getDeliverySnapshot();
final Entities.StreamProcessorStateV1 snapshot = Entities.StreamProcessorStateV1.newBuilder()
.setOffsetState(state)
.setDeliveryState(ByteString.copyFrom(serializer.toBinary(deliverySnapshot)))
.build();
saveSnapshot(snapshot);
}
});
})
.match(SaveSnapshotSuccess.class, msg -> {
log.debug("Saving snapshot succeeded: {}", msg);
})
.match(SaveSnapshotFailure.class, msg -> {
log.info("Saving snapshot failed: {}", msg);
})
.matchAny(o -> log.info("Received unknown command message: {}", o))
.build();
}
private static final class OffsetSaved {
private final Entities.OffsetStateV1 offsetState;
private OffsetSaved(final Entities.OffsetStateV1 offsetState) {
this.offsetState = offsetState;
}
}
private static final class SaveOffset {
private final long offset;
private SaveOffset(final long offset) {
this.offset = offset;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment