Skip to content

Instantly share code, notes, and snippets.

@object
Created May 20, 2018 07:29
Show Gist options
  • Save object/0cd39086ec7758e6a0d3d2a510894ccb to your computer and use it in GitHub Desktop.
Save object/0cd39086ec7758e6a0d3d2a510894ccb to your computer and use it in GitHub Desktop.
Akkling persistence example
1. Yaml configuration section
<?xml version="1.0" encoding="utf-8"?>
<akka.persistence>
<hocon>
<![CDATA[
akka {
persistence{
query.journal.sql {
max-buffer-size = 10000
}
journal {
plugin = "akka.persistence.journal.sql-server"
sql-server {
class = "Akka.Persistence.SqlServer.Journal.BatchingSqlServerJournal, Akka.Persistence.SqlServer"
schema-name = dbo
table-name = EventJournal
auto-initialize = off
event-adapters {
json-adapter = "Nrk.Oddjob.Upload.PersistenceUtils+EventAdapter, Upload"
}
event-adapter-bindings {
# to journal
"System.Object, mscorlib" = json-adapter
# from journal
"Newtonsoft.Json.Linq.JObject, Newtonsoft.Json" = [json-adapter]
}
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.sql-server"
sql-server {
class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
serializer = hyperion
schema-name = dbo
table-name = SnapshotStore
auto-initialize = off
}
}
}
}
]]>
</hocon>
</akka.persistence>
2. Event adapter
let private deserializeEvent (evt : obj) =
let json = evt :?> Newtonsoft.Json.Linq.JObject
let objectType = json.[eventType]
let typ = if objectType = null then null else Type.GetType(objectType.ToString())
match typ with
| null -> box json
| typ -> json.ToObject(typ)
type EventAdapter(__ : Akka.Actor.ExtendedActorSystem) =
interface Akka.Persistence.Journal.IEventAdapter with
member __.Manifest(_ : obj) =
let manifestType = typeof<Newtonsoft.Json.Linq.JObject>
sprintf "%s,%s" manifestType.FullName <| manifestType.Assembly.GetName().Name
member __.ToJournal(evt : obj) : obj =
let jObject = Newtonsoft.Json.Linq.JObject.FromObject(evt)
jObject.AddFirst(Newtonsoft.Json.Linq.JProperty(eventType, evt.GetType().FullName));
Akka.Persistence.Journal.Tagged(box jObject, [| anyEventTag |]) :> obj
member __.FromJournal(evt : obj, _ : string) : Akka.Persistence.Journal.IEventSequence =
if evt :? Newtonsoft.Json.Linq.JObject then
Akka.Persistence.Journal.EventSequence.Single(deserializeEvent evt)
else
Akka.Persistence.Journal.EventSequence.Empty
3. Persistent actors templates that we use for most of our actors
[<AutoOpen>]
module PersistenceActorTemplates =
open Akka
open Akka.Persistence
open Akkling
open Akkling.Persistence
open Nrk.Oddjob.Core
open Nrk.Oddjob.Core.ActorUtils
type TakeSnapshotCommand() = class end
type PersistentActorContinuation<'memEvent, 'memState> =
| Store of 'memEvent
| StoreAll of 'memEvent list
| Loop
/// Only used in originAssetAssignmentActor for reasons explained there. Please don't use it.
| LoopWithExtendedState of 'memState
let [<Literal>] SnapshotFrequency = 100
(*
This function is used to create implementations of persisten actors.
'dbState and 'dbEvent live in database. They should be treated with care since it is hard to change them.
'memState and 'memEvent live in memory of the actor, and can be changed easily.
State of the actor (both in db and memory) is constructed from incoming events.
Notable funcitons:
- handle: receives current state and incoming event, handles event and returns information for the persistent layer how to proceed
- updateState: used when actor is restored from db (after restart or after storing a new event). Takes state and event and produces new state.
*)
let persistentActorTemplate<'memState, 'dbState, 'memEvent, 'dbEvent, 'cmd>
(handle : 'memState -> 'cmd -> PersistentActorContinuation<'memEvent, 'memState>)
(updateState : 'memState -> 'memEvent -> 'memState)
(initialState : 'memState)
(memState2db : 'memState -> 'dbState)
(dbState2mem : 'dbState -> 'memState)
(memEvent2db : 'memEvent -> 'dbEvent)
(dbEvent2mem : 'dbEvent -> 'memEvent)
(onRecoveryCompleted : 'memState -> unit)
(mailbox: Eventsourced<_>) =
let rec loop (memState : 'memState) eventCountSinceLastSnapshot =
actor {
let! (message : obj) = mailbox.Receive ()
return!
try
match message with
| :? 'cmd as cmd ->
match handle memState cmd with
| Store event -> memEvent2db event |> box |> Persist :> Effect<_>
| StoreAll events -> events |> List.map (memEvent2db >> box) |> List.toSeq |> PersistAll :> Effect<_>
| Loop -> loop memState eventCountSinceLastSnapshot
| LoopWithExtendedState state -> loop state eventCountSinceLastSnapshot
| :? TakeSnapshotCommand ->
let snapshotStore = typed mailbox.SnapshotStore
snapshotStore <! SaveSnapshot(SnapshotMetadata(mailbox.Pid, mailbox.LastSequenceNr ()), memState2db memState)
loop memState eventCountSinceLastSnapshot
// This message appears 1) after data has been persisted or 2) when restoring state from event stream
| :? 'dbEvent as dbEvent ->
let updated = updateState memState (dbEvent2mem dbEvent)
let eventCountSinceLastSnapshot = eventCountSinceLastSnapshot + 1
if eventCountSinceLastSnapshot = SnapshotFrequency then mailbox.Self <! box (TakeSnapshotCommand())
loop updated eventCountSinceLastSnapshot
// Restoring state from snapshot
| :? SnapshotOffer as o ->
let dbState = o.Snapshot :?> 'dbState
loop (dbState2mem dbState) 0
| :? PersistentLifecycleEvent as e ->
match e with
| ReplaySucceed ->
logDebugf mailbox "Replaying persistent events succeeded. Current state [%A]" memState
loop memState eventCountSinceLastSnapshot
| ReplayFailed (exn, message) ->
let text = sprintf "ReplayFailed when replaying persistent events. Current state [%A]" memState
logErrorWithExnf mailbox exn "ReplayFailed due to error when processing message %O [%A]" (message.GetType()) message
failwith text
| :? LifecycleEvent as e ->
match e with
| PreRestart (exn, message) ->
match message with
| null -> logErrorWithExnf mailbox exn "PreRestart due to error when processing <null> message"
| _ -> logErrorWithExnf mailbox exn "PreRestart due to error when processing message %O [%A]" (message.GetType()) message
loop memState eventCountSinceLastSnapshot
| _ -> IgnoredMessage
| :? Akka.Persistence.RecoveryCompleted ->
logDebugf mailbox "Replaying persistent events succeeded. Current state [%A]" memState
onRecoveryCompleted memState
loop memState eventCountSinceLastSnapshot
| :? SaveSnapshotSuccess -> loop memState (eventCountSinceLastSnapshot - SnapshotFrequency)
| :? SaveSnapshotFailure as e ->
let text = sprintf "Error when saving snapshot. Metadata [%A]. Current state [%A]" e.Metadata memState
logErrorWithExn mailbox e.Cause text
loop memState eventCountSinceLastSnapshot
| _ -> UnhandledMessage
with exn ->
logErrorWithExnf mailbox exn "Error processing persistent actor message %A" message
reraise ()
}
loop initialState 0
(*
Simpler version of persistentActorTemplate. In this actor, there is no difference between state and event.
State is the event and vice versa, they are of the same type.
This means that each new event replaces the actor state completely.
*)
let basicPersistentActorTemplate<'memState, 'dbState, 'cmd>
(handler : 'memState -> 'cmd -> PersistentActorContinuation<'memState, 'memState>)
(initialState : 'memState)
(memState2db : 'memState -> 'dbState)
(dbState2mem : 'dbState -> 'memState)
(onRecoveryCompleted : 'memState -> unit)
(mailbox: Eventsourced<_>) =
persistentActorTemplate handler (fun _ newState -> newState) initialState memState2db dbState2mem memState2db dbState2mem onRecoveryCompleted mailbox
4. Example of a persistent actor
type StorageAssignment =
{ StorageId : string
EdgeChar : string
Timestamp : DateTimeOffset }
static member FromPersistentType (db : PersistentTypes.AkamaiStorageAssignment) = { StorageAssignment.StorageId = db.StorageId; EdgeChar = db.EdgeChar; Timestamp = db.Timestamp }
static member ToPersistentType (mem : StorageAssignment) = PersistentTypes.AkamaiStorageAssignment.Create(mem.StorageId, mem.EdgeChar, mem.Timestamp)
static member Zero = StorageAssignment.FromPersistentType (PersistentTypes.AkamaiStorageAssignment.Zero())
type StorageCommand =
/// Assigns (overwriting previous assignment) storage details to a persistent actor
| AssignStorage of StorageAssignment
/// Checks current storage assignment and returns it to the sender if found, otherwise returns StorageAssignment.Zero
| QueryStorage
let akamaiStorageAssignmentActor (storagePicker:IActorRef<StoragePickerCommand>) (mailbox:Eventsourced<_>) =
let random = Random()
let makeEdgeChar () =
match random.Next(3) with | 0 -> "a" | 1 -> "b" | _ -> "c"
let handler state cmd =
match cmd with
| AssignStorage storage -> Store storage
| QueryStorage ->
match state with
| storage when String.IsNullOrEmpty(storage.StorageId) ->
let (storageId : string, _ : IActorRef<Message<SftpCommand>>) = storagePicker <? SelectStorage |> Async.RunSynchronously
let edgeChar = makeEdgeChar ()
let storage = { StorageAssignment.StorageId = storageId; EdgeChar = edgeChar; Timestamp = DateTimeOffset.Now }
mailbox.Sender() <! storage
Store storage
| storage ->
mailbox.Sender() <! storage
Loop
basicPersistentActorTemplate handler StorageAssignment.Zero StorageAssignment.ToPersistentType StorageAssignment.FromPersistentType (fun _ -> ()) mailbox
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment