Skip to content

Instantly share code, notes, and snippets.

@promontis
Created October 11, 2019 16:53
Show Gist options
  • Save promontis/b69b91cb0f596d9fcb151cab0b199eb2 to your computer and use it in GitHub Desktop.
Save promontis/b69b91cb0f596d9fcb151cab0b199eb2 to your computer and use it in GitHub Desktop.
public class Subscriber : ISubscribeSynchronousToAll
{
private readonly StreamsDBClient _client;
private readonly string _assemblyName;
public Subscriber(StreamsDBClient client)
{
_client = client;
_assemblyName = Assembly.GetEntryAssembly().GetName().Name;
}
public async Task HandleAsync(IReadOnlyCollection<IDomainEvent> domainEvents, CancellationToken cancellationToken)
{
var offset = await GetOffset();
offset += domainEvents.Count;
await SetOffset(offset);
}
private async Task SetOffset(long offset)
{
var messageInput = new MessageInput
{
ID = Guid.NewGuid().ToString(),
Value = Encoding.UTF8.GetBytes(offset.ToString()),
Type = "offset"
};
await _client.DB().AppendStream(_assemblyName, messageInput);
}
public async Task<long> GetOffset()
{
var (message, found) = await _client.DB().ReadLastMessageFromStream(_assemblyName);
if (!found)
{
return 0;
}
return long.Parse(Encoding.UTF8.GetString(message.Value));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment