Created
March 18, 2017 14:20
-
-
Save yevhen/c13aaf7a6500be701054f99edec30118 to your computer and use it in GitHub Desktop.
Example of AsyncBatcher pattern for acknowledged processing using Orleankka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Reentrant("IsReentrant")] | |
public class SaveMentionBatcher : Actor | |
{ | |
public static bool IsReentrant(object message) => true; | |
public static TimeSpan FlushTimeout = TimeSpan.FromSeconds(10); | |
public static int BatchSize = 512; | |
readonly List<BufferItem> buffer = new List<BufferItem>(); | |
ActorRef topic; | |
public override Task OnActivate() | |
{ | |
target = System.TypedActorOf<ITopic>(Id); | |
Timers.Register("flush", due: FlushTimeout, period: FlushTimeout, Flush); | |
return base.OnActivate(); | |
} | |
async Task<SaveMentionStatus> On(SaveMention cmd) | |
{ | |
var item = new BufferItem(cmd); | |
buffer.Add(item); | |
if (buffer.Count == BatchSize) | |
await Flush(); | |
return await item.Task(); | |
} | |
async Task Flush() | |
{ | |
var items = buffer.ToList(); // snapshot | |
buffer.Clear(); | |
try | |
{ | |
var batch = new SaveMentionBatch(items.Select(x => x.Command)); | |
var result = await topic.Ask<SaveMentionBatchResult>(batch); | |
items.ForEach(x => x.CompleteResult(result.Items[x.Command.Id])); | |
} | |
catch (Exception ex) | |
{ | |
items.ForEach(x => x.CompleteError(ex)); | |
} | |
} | |
class BufferItem | |
{ | |
readonly TaskCompletionSource<SaveMentionStatus> source; | |
public Task<SaveMentionStatus> Task() => source.Task; | |
public readonly SaveMention Command; | |
public BufferItem(SaveMention command) | |
{ | |
source = new TaskCompletionSource<SaveMentionStatus>(); | |
Command = command; | |
} | |
public void CompleteResult(SaveMentionStatus status) => source.SetResult(status); | |
public void CompleteError(Exception exception) => source.SetException(exception); | |
} | |
} |
or by returning bool
signaling message acceptance status
if (flushing && buffer.Count + 1 == BatchSize)
return SaveMentionStatus.Rejected;
either way, it is a waste of the bandwidth, which could be avoided if message is queued locally
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Backpressure could be implemented by throwing an exception. Along the lines:
but it's a call-stack waste