Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Created January 26, 2017 09:08
Show Gist options
  • Save Horusiath/85f9593bd086dfe439bef476e658e07f to your computer and use it in GitHub Desktop.
Save Horusiath/85f9593bd086dfe439bef476e658e07f to your computer and use it in GitHub Desktop.
Akka.Persistence example with proto-buf.net as a custom domain event serializer
using System;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.SqlServer;
namespace TestApp
{
class Program
{
public static void Main()
{
var config = ConfigurationFactory.ParseString(@"
akka {
actor {
serializers {
proto-buf = ""TestApp.ProtoBufSerializer, TestApp""
}
serialization-bindings {
""TestApp.IProtoBufSerializable, TestApp"" = proto-buf
}
serialization-identifiers {
""TestApp.ProtoBufSerializer, TestApp"" = 120
}
}
persistence {
journal {
plugin = ""akka.persistence.journal.sql-server""
sql-server {
auto-initialize = on
connection-string = ""Server=.;Database=akka_persistence_tests;Trusted_Connection=True;""
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.sql-server""
sql-server {
auto-initialize = on
connection-string = ""Server=.;Database=akka_persistence_tests;Trusted_Connection=True;""
}
}
}
}
").WithFallback(SqlServerPersistence.DefaultConfiguration());
using (var system = ActorSystem.Create("system", config))
{
var pref = system.ActorOf(Props.Create(() => new ShoppingCart("@user")), "@user");
for (int i = 0; i < 100; i++)
{
pref.Tell(new AddItem("item-" + i));
}
var state = pref.Ask<ShoppingCartState>(new GetState()).Result;
Console.WriteLine($"{pref} state is: {string.Join(", ", state.Items)}");
Console.ReadLine();
}
}
}
}
using System;
using System.IO;
using Akka.Actor;
namespace TestApp
{
// protobuf.net serializer
public class ProtoBufSerializer : Akka.Serialization.Serializer
{
public override bool IncludeManifest => false;
public ProtoBufSerializer(ExtendedActorSystem system) : base(system)
{
}
public override byte[] ToBinary(object obj)
{
// good idea is to use Microsoft.IO.RecyclableMemoryStream here
using (var stream = new MemoryStream())
{
ProtoBuf.Serializer.Serialize(stream, obj);
stream.Position = 0;
return stream.ToArray();
}
}
public override object FromBinary(byte[] bytes, Type type)
{
// good idea is to use Microsoft.IO.RecyclableMemoryStream here
using (var stream = new MemoryStream(bytes))
{
return ProtoBuf.Serializer.Deserialize(type, stream);
}
}
}
}
using System;
using System.Collections.Generic;
using Akka.Persistence;
using ProtoBuf;
namespace TestApp
{
#region messages
public interface IProtoBufSerializable { }
public sealed class GetState { }
public sealed class AddItem
{
public readonly string Name;
public AddItem(string name)
{
Name = name;
}
}
[ProtoContract]
public sealed class ItemAdded : IProtoBufSerializable
{
[ProtoMember(1)] public readonly string Name;
[ProtoMember(2)] public readonly DateTime Timestamp;
public ItemAdded(string name, DateTime timestamp)
{
Name = name;
Timestamp = timestamp;
}
}
#endregion
[ProtoContract]
public sealed class ShoppingCartState : IProtoBufSerializable
{
[ProtoMember(1)]
public readonly List<string> Items = new List<string>();
}
public sealed class ShoppingCart : ReceivePersistentActor
{
private const int SnapshotAfter = 5;
public override string PersistenceId { get; }
private ShoppingCartState state;
private int snapshotCounter = 0;
public ShoppingCart(string persistenceId)
{
PersistenceId = persistenceId;
state = new ShoppingCartState();
Recover<ItemAdded>(e => UpdateState(e));
Recover<SnapshotOffer>(offer =>
{
state = offer.Snapshot as ShoppingCartState ?? new ShoppingCartState();
});
Command<AddItem>(cmd =>
{
Persist(new ItemAdded(cmd.Name, DateTime.UtcNow), e =>
{
UpdateState(e);
// snapshot every 5 events
if ((++snapshotCounter) % SnapshotAfter == 0)
{
SaveSnapshot(state);
}
});
});
Command<GetState>(_ => Sender.Tell(state, Self));
}
private void UpdateState(ItemAdded e)
{
state.Items.Add(e.Name);
}
}
}
@ismaelhamed
Copy link

ismaelhamed commented Jan 27, 2017

This works great for our custom domain events being persisted, but I noticed all the internal akka messages, like the ones persisted by the Sharding, are still serialized with the default serializer (wire) event thought I can see there's a ClusterShardingMessageSerializer with support for Google's Protobuf in place. But for some reason it doesn't kick in. I thought the idea would be to have everything that goes into persistence serialized in Google Protobuf, including akka's own messages.

@Havret
Copy link

Havret commented Mar 6, 2019

I am trying to mimic your solution but got type null in FromBinary.

@jameswilddev
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment