Last active
November 10, 2017 16:13
-
-
Save n1ko-w1ll/af99cdddc36c419f0c5eafb2302f233f to your computer and use it in GitHub Desktop.
Akka Stream Processor - destilled example out of a real world project that demonstrates how to persist the event offset together with the serialized AtLeastOnceDeliverySnapshot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.protobuf.ByteString; | |
import akka.actor.Props; | |
import akka.event.DiagnosticLoggingAdapter; | |
import akka.event.Logging; | |
import akka.japi.pf.ReceiveBuilder; | |
import akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery; | |
import akka.persistence.AtLeastOnceDelivery; | |
import akka.persistence.RecoveryCompleted; | |
import akka.persistence.SaveSnapshotFailure; | |
import akka.persistence.SaveSnapshotSuccess; | |
import akka.persistence.SnapshotOffer; | |
import akka.serialization.SerializationExtension; | |
import akka.serialization.Serializer; | |
import proto.entities.Entities; | |
import scala.PartialFunction; | |
import scala.runtime.BoxedUnit; | |
public class StreamProcessor extends AbstractPersistentActorWithAtLeastOnceDelivery { | |
private static final int SNAPSHOT_INTERVAL = 1000; | |
private final DiagnosticLoggingAdapter log = Logging.apply(this); | |
public static Props props() { | |
return Props.create(StreamProcessor::new); | |
} | |
private final Serializer serializer; | |
private Entities.OffsetStateV1 state = Entities.OffsetStateV1.newBuilder().setOffset(0L).build(); | |
private StreamProcessor() { | |
this.serializer = SerializationExtension.get(context().system()) | |
.serializerFor(AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot.class); | |
} | |
@Override | |
public String persistenceId() { | |
return "StreamProcessor"; | |
} | |
@Override | |
public PartialFunction<Object, BoxedUnit> receiveRecover() { | |
return ReceiveBuilder | |
.match(OffsetSaved.class, ev -> state = ev.offsetState) | |
.match(SnapshotOffer.class, s -> { | |
if (s.snapshot() instanceof Entities.StreamProcessorStateV1) { | |
final Entities.StreamProcessorStateV1 snapshot = (Entities.StreamProcessorStateV1) s.snapshot(); | |
final Object deliverySnapshot = serializer.fromBinary( | |
snapshot.getDeliveryState().toByteArray(), AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot.class); | |
snapshot.getOffsetState(); | |
setDeliverySnapshot((AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot) deliverySnapshot); | |
} | |
}) | |
.match(RecoveryCompleted.class, completed -> log.debug("Recovery completed. Offset = {}", state.getOffset())) | |
.matchAny(o -> log.warning("Received unknown recover message: {}", o)) | |
.build(); | |
} | |
@Override | |
public PartialFunction<Object, BoxedUnit> receiveCommand() { | |
return ReceiveBuilder | |
.match(SaveOffset.class, msg -> { | |
persist(new OffsetSaved(Entities.OffsetStateV1.newBuilder().setOffset(msg.offset).build()), ev -> { | |
state = ev.offsetState; | |
if (lastSequenceNr() % SNAPSHOT_INTERVAL == 0 && lastSequenceNr() != 0) { | |
log.debug("Snapshot interval reached, about to save snapshot..."); | |
final AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot deliverySnapshot = getDeliverySnapshot(); | |
final Entities.StreamProcessorStateV1 snapshot = Entities.StreamProcessorStateV1.newBuilder() | |
.setOffsetState(state) | |
.setDeliveryState(ByteString.copyFrom(serializer.toBinary(deliverySnapshot))) | |
.build(); | |
saveSnapshot(snapshot); | |
} | |
}); | |
}) | |
.match(SaveSnapshotSuccess.class, msg -> { | |
log.debug("Saving snapshot succeeded: {}", msg); | |
}) | |
.match(SaveSnapshotFailure.class, msg -> { | |
log.info("Saving snapshot failed: {}", msg); | |
}) | |
.matchAny(o -> log.info("Received unknown command message: {}", o)) | |
.build(); | |
} | |
private static final class OffsetSaved { | |
private final Entities.OffsetStateV1 offsetState; | |
private OffsetSaved(final Entities.OffsetStateV1 offsetState) { | |
this.offsetState = offsetState; | |
} | |
} | |
private static final class SaveOffset { | |
private final long offset; | |
private SaveOffset(final long offset) { | |
this.offset = offset; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment