Skip to content

Instantly share code, notes, and snippets.

@yevhen
Created March 18, 2017 14:20
Show Gist options
  • Save yevhen/c13aaf7a6500be701054f99edec30118 to your computer and use it in GitHub Desktop.
Save yevhen/c13aaf7a6500be701054f99edec30118 to your computer and use it in GitHub Desktop.
Example of AsyncBatcher pattern for acknowledged processing using Orleankka
[Reentrant("IsReentrant")]
public class SaveMentionBatcher : Actor
{
public static bool IsReentrant(object message) => true;
public static TimeSpan FlushTimeout = TimeSpan.FromSeconds(10);
public static int BatchSize = 512;
readonly List<BufferItem> buffer = new List<BufferItem>();
ActorRef topic;
public override Task OnActivate()
{
target = System.TypedActorOf<ITopic>(Id);
Timers.Register("flush", due: FlushTimeout, period: FlushTimeout, Flush);
return base.OnActivate();
}
async Task<SaveMentionStatus> On(SaveMention cmd)
{
var item = new BufferItem(cmd);
buffer.Add(item);
if (buffer.Count == BatchSize)
await Flush();
return await item.Task();
}
async Task Flush()
{
var items = buffer.ToList(); // snapshot
buffer.Clear();
try
{
var batch = new SaveMentionBatch(items.Select(x => x.Command));
var result = await topic.Ask<SaveMentionBatchResult>(batch);
items.ForEach(x => x.CompleteResult(result.Items[x.Command.Id]));
}
catch (Exception ex)
{
items.ForEach(x => x.CompleteError(ex));
}
}
class BufferItem
{
readonly TaskCompletionSource<SaveMentionStatus> source;
public Task<SaveMentionStatus> Task() => source.Task;
public readonly SaveMention Command;
public BufferItem(SaveMention command)
{
source = new TaskCompletionSource<SaveMentionStatus>();
Command = command;
}
public void CompleteResult(SaveMentionStatus status) => source.SetResult(status);
public void CompleteError(Exception exception) => source.SetException(exception);
}
}
@yevhen
Copy link
Author

yevhen commented Mar 18, 2017

Backpressure could be implemented by throwing an exception. Along the lines:

    async Task<SaveMentionStatus> On(SaveMention cmd)
    {
        if (flushing && buffer.Count + 1 == BatchSize)
            throw new AtMaxCapacityException();

        var item = new BufferItem(cmd);
        buffer.Add(item);
 
        if (buffer.Count == BatchSize)
        {
             flushing = true;
             await Flush();
        }

        return await item.Task();
    }

but it's a call-stack waste

@yevhen
Copy link
Author

yevhen commented Mar 18, 2017

or by returning bool signaling message acceptance status

        if (flushing && buffer.Count + 1 == BatchSize)
            return SaveMentionStatus.Rejected;

@yevhen
Copy link
Author

yevhen commented Mar 18, 2017

either way, it is a waste of the bandwidth, which could be avoided if message is queued locally

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment