Created
May 20, 2018 07:29
-
-
Save object/0cd39086ec7758e6a0d3d2a510894ccb to your computer and use it in GitHub Desktop.
Akkling persistence example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1. Yaml configuration section | |
<?xml version="1.0" encoding="utf-8"?> | |
<akka.persistence> | |
<hocon> | |
<![CDATA[ | |
akka { | |
persistence{ | |
query.journal.sql { | |
max-buffer-size = 10000 | |
} | |
journal { | |
plugin = "akka.persistence.journal.sql-server" | |
sql-server { | |
class = "Akka.Persistence.SqlServer.Journal.BatchingSqlServerJournal, Akka.Persistence.SqlServer" | |
schema-name = dbo | |
table-name = EventJournal | |
auto-initialize = off | |
event-adapters { | |
json-adapter = "Nrk.Oddjob.Upload.PersistenceUtils+EventAdapter, Upload" | |
} | |
event-adapter-bindings { | |
# to journal | |
"System.Object, mscorlib" = json-adapter | |
# from journal | |
"Newtonsoft.Json.Linq.JObject, Newtonsoft.Json" = [json-adapter] | |
} | |
} | |
} | |
snapshot-store { | |
plugin = "akka.persistence.snapshot-store.sql-server" | |
sql-server { | |
class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer" | |
serializer = hyperion | |
schema-name = dbo | |
table-name = SnapshotStore | |
auto-initialize = off | |
} | |
} | |
} | |
} | |
]]> | |
</hocon> | |
</akka.persistence> | |
2. Event adapter | |
let private deserializeEvent (evt : obj) = | |
let json = evt :?> Newtonsoft.Json.Linq.JObject | |
let objectType = json.[eventType] | |
let typ = if objectType = null then null else Type.GetType(objectType.ToString()) | |
match typ with | |
| null -> box json | |
| typ -> json.ToObject(typ) | |
type EventAdapter(__ : Akka.Actor.ExtendedActorSystem) = | |
interface Akka.Persistence.Journal.IEventAdapter with | |
member __.Manifest(_ : obj) = | |
let manifestType = typeof<Newtonsoft.Json.Linq.JObject> | |
sprintf "%s,%s" manifestType.FullName <| manifestType.Assembly.GetName().Name | |
member __.ToJournal(evt : obj) : obj = | |
let jObject = Newtonsoft.Json.Linq.JObject.FromObject(evt) | |
jObject.AddFirst(Newtonsoft.Json.Linq.JProperty(eventType, evt.GetType().FullName)); | |
Akka.Persistence.Journal.Tagged(box jObject, [| anyEventTag |]) :> obj | |
member __.FromJournal(evt : obj, _ : string) : Akka.Persistence.Journal.IEventSequence = | |
if evt :? Newtonsoft.Json.Linq.JObject then | |
Akka.Persistence.Journal.EventSequence.Single(deserializeEvent evt) | |
else | |
Akka.Persistence.Journal.EventSequence.Empty | |
3. Persistent actors templates that we use for most of our actors | |
[<AutoOpen>] | |
module PersistenceActorTemplates = | |
open Akka | |
open Akka.Persistence | |
open Akkling | |
open Akkling.Persistence | |
open Nrk.Oddjob.Core | |
open Nrk.Oddjob.Core.ActorUtils | |
type TakeSnapshotCommand() = class end | |
type PersistentActorContinuation<'memEvent, 'memState> = | |
| Store of 'memEvent | |
| StoreAll of 'memEvent list | |
| Loop | |
/// Only used in originAssetAssignmentActor for reasons explained there. Please don't use it. | |
| LoopWithExtendedState of 'memState | |
let [<Literal>] SnapshotFrequency = 100 | |
(* | |
This function is used to create implementations of persisten actors. | |
'dbState and 'dbEvent live in database. They should be treated with care since it is hard to change them. | |
'memState and 'memEvent live in memory of the actor, and can be changed easily. | |
State of the actor (both in db and memory) is constructed from incoming events. | |
Notable funcitons: | |
- handle: receives current state and incoming event, handles event and returns information for the persistent layer how to proceed | |
- updateState: used when actor is restored from db (after restart or after storing a new event). Takes state and event and produces new state. | |
*) | |
let persistentActorTemplate<'memState, 'dbState, 'memEvent, 'dbEvent, 'cmd> | |
(handle : 'memState -> 'cmd -> PersistentActorContinuation<'memEvent, 'memState>) | |
(updateState : 'memState -> 'memEvent -> 'memState) | |
(initialState : 'memState) | |
(memState2db : 'memState -> 'dbState) | |
(dbState2mem : 'dbState -> 'memState) | |
(memEvent2db : 'memEvent -> 'dbEvent) | |
(dbEvent2mem : 'dbEvent -> 'memEvent) | |
(onRecoveryCompleted : 'memState -> unit) | |
(mailbox: Eventsourced<_>) = | |
let rec loop (memState : 'memState) eventCountSinceLastSnapshot = | |
actor { | |
let! (message : obj) = mailbox.Receive () | |
return! | |
try | |
match message with | |
| :? 'cmd as cmd -> | |
match handle memState cmd with | |
| Store event -> memEvent2db event |> box |> Persist :> Effect<_> | |
| StoreAll events -> events |> List.map (memEvent2db >> box) |> List.toSeq |> PersistAll :> Effect<_> | |
| Loop -> loop memState eventCountSinceLastSnapshot | |
| LoopWithExtendedState state -> loop state eventCountSinceLastSnapshot | |
| :? TakeSnapshotCommand -> | |
let snapshotStore = typed mailbox.SnapshotStore | |
snapshotStore <! SaveSnapshot(SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ()), memState2db memState) | |
loop memState eventCountSinceLastSnapshot | |
// This message appears 1) after data has been persisted or 2) when restoring state from event stream | |
| :? 'dbEvent as dbEvent -> | |
let updated = updateState memState (dbEvent2mem dbEvent) | |
let eventCountSinceLastSnapshot = eventCountSinceLastSnapshot + 1 | |
if eventCountSinceLastSnapshot = SnapshotFrequency then mailbox.Self <! box (TakeSnapshotCommand()) | |
loop updated eventCountSinceLastSnapshot | |
// Restoring state from snapshot | |
| :? SnapshotOffer as o -> | |
let dbState = o.Snapshot :?> 'dbState | |
loop (dbState2mem dbState) 0 | |
| :? PersistentLifecycleEvent as e -> | |
match e with | |
| ReplaySucceed -> | |
logDebugf mailbox "Replaying persistent events succeeded. Current state [%A]" memState | |
loop memState eventCountSinceLastSnapshot | |
| ReplayFailed (exn, message) -> | |
let text = sprintf "ReplayFailed when replaying persistent events. Current state [%A]" memState | |
logErrorWithExnf mailbox exn "ReplayFailed due to error when processing message %O [%A]" (message.GetType()) message | |
failwith text | |
| :? LifecycleEvent as e -> | |
match e with | |
| PreRestart (exn, message) -> | |
match message with | |
| null -> logErrorWithExnf mailbox exn "PreRestart due to error when processing <null> message" | |
| _ -> logErrorWithExnf mailbox exn "PreRestart due to error when processing message %O [%A]" (message.GetType()) message | |
loop memState eventCountSinceLastSnapshot | |
| _ -> IgnoredMessage | |
| :? Akka.Persistence.RecoveryCompleted -> | |
logDebugf mailbox "Replaying persistent events succeeded. Current state [%A]" memState | |
onRecoveryCompleted memState | |
loop memState eventCountSinceLastSnapshot | |
| :? SaveSnapshotSuccess -> loop memState (eventCountSinceLastSnapshot - SnapshotFrequency) | |
| :? SaveSnapshotFailure as e -> | |
let text = sprintf "Error when saving snapshot. Metadata [%A]. Current state [%A]" e.Metadata memState | |
logErrorWithExn mailbox e.Cause text | |
loop memState eventCountSinceLastSnapshot | |
| _ -> UnhandledMessage | |
with exn -> | |
logErrorWithExnf mailbox exn "Error processing persistent actor message %A" message | |
reraise () | |
} | |
loop initialState 0 | |
(* | |
Simpler version of persistentActorTemplate. In this actor, there is no difference between state and event. | |
State is the event and vice versa, they are of the same type. | |
This means that each new event replaces the actor state completely. | |
*) | |
let basicPersistentActorTemplate<'memState, 'dbState, 'cmd> | |
(handler : 'memState -> 'cmd -> PersistentActorContinuation<'memState, 'memState>) | |
(initialState : 'memState) | |
(memState2db : 'memState -> 'dbState) | |
(dbState2mem : 'dbState -> 'memState) | |
(onRecoveryCompleted : 'memState -> unit) | |
(mailbox: Eventsourced<_>) = | |
persistentActorTemplate handler (fun _ newState -> newState) initialState memState2db dbState2mem memState2db dbState2mem onRecoveryCompleted mailbox | |
4. Example of a persistent actor | |
type StorageAssignment = | |
{ StorageId : string | |
EdgeChar : string | |
Timestamp : DateTimeOffset } | |
static member FromPersistentType (db : PersistentTypes.AkamaiStorageAssignment) = { StorageAssignment.StorageId = db.StorageId; EdgeChar = db.EdgeChar; Timestamp = db.Timestamp } | |
static member ToPersistentType (mem : StorageAssignment) = PersistentTypes.AkamaiStorageAssignment.Create(mem.StorageId, mem.EdgeChar, mem.Timestamp) | |
static member Zero = StorageAssignment.FromPersistentType (PersistentTypes.AkamaiStorageAssignment.Zero()) | |
type StorageCommand = | |
/// Assigns (overwriting previous assignment) storage details to a persistent actor | |
| AssignStorage of StorageAssignment | |
/// Checks current storage assignment and returns it to the sender if found, otherwise returns StorageAssignment.Zero | |
| QueryStorage | |
let akamaiStorageAssignmentActor (storagePicker:IActorRef<StoragePickerCommand>) (mailbox:Eventsourced<_>) = | |
let random = Random() | |
let makeEdgeChar () = | |
match random.Next(3) with | 0 -> "a" | 1 -> "b" | _ -> "c" | |
let handler state cmd = | |
match cmd with | |
| AssignStorage storage -> Store storage | |
| QueryStorage -> | |
match state with | |
| storage when String.IsNullOrEmpty(storage.StorageId) -> | |
let (storageId : string, _ : IActorRef<Message<SftpCommand>>) = storagePicker <? SelectStorage |> Async.RunSynchronously | |
let edgeChar = makeEdgeChar () | |
let storage = { StorageAssignment.StorageId = storageId; EdgeChar = edgeChar; Timestamp = DateTimeOffset.Now } | |
mailbox.Sender() <! storage | |
Store storage | |
| storage -> | |
mailbox.Sender() <! storage | |
Loop | |
basicPersistentActorTemplate handler StorageAssignment.Zero StorageAssignment.ToPersistentType StorageAssignment.FromPersistentType (fun _ -> ()) mailbox |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment