Skip to content

Instantly share code, notes, and snippets.

@sdcondon
Last active September 24, 2017 11:16
Show Gist options
  • Save sdcondon/5be25120916beb6a27189f0cfb173f13 to your computer and use it in GitHub Desktop.
Save sdcondon/5be25120916beb6a27189f0cfb173f13 to your computer and use it in GitHub Desktop.
A simple periodic batching pipeline using BlockingCollections
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class BatchingPipelineDemo
{
// "Environmental" factors
private static Random Rnd = new Random();
private const int InputIntervalMsLow = 100;
private const int InputIntervalMsHigh = 1900;
private const int EmitBatchDelayMsLow = 1000;
private const int EmitBatchDelayMsHigh = 4000;
private const int EmitBatchFailurePercentage = 0;
// Params for Batching Pipeline
private const int MaxBatchSize = 3;
private const int MaxBatchBacklog = 5;
private static TimeSpan MaxBatchInterval = TimeSpan.FromSeconds(3);
internal static void Main()
{
using (var batcher = new BatchingPipeline<string>(MaxBatchSize, MaxBatchInterval, MaxBatchBacklog, EmitBatch))
{
int i = 0;
Timer inputTimer = null;
inputTimer = new Timer(o =>
{
batcher.Enqueue($"Message {i++}");
inputTimer.Change(Rnd.Next(InputIntervalMsLow, InputIntervalMsHigh), int.MaxValue);
}, null, 0, int.MaxValue);
Console.WriteLine("Press any key to stop the pipeline.");
Console.ReadKey(true);
Log("Stopping.");
inputTimer.Dispose();
}
Log("Pipeline stopped. Press any key to exit.");
Console.ReadKey(true);
}
private static void EmitBatch<T>(ICollection<T> messages)
{
var sleepFor = Rnd.Next(EmitBatchDelayMsLow, EmitBatchDelayMsHigh);
Thread.Sleep(sleepFor);
if (Rnd.Next(100) < EmitBatchFailurePercentage)
{
throw new InvalidOperationException("KABOOM");
}
Log($"Emitting {messages.Count} messages after {sleepFor}ms delay:\r\n\t{string.Join("\r\n\t", messages)}");
}
private static void Log(string message)
{
Console.WriteLine($"[{DateTime.Now.ToString("HH:mm:ss.fff")}] {message}");
}
/// <summary>
/// Bathing pipeline logic that uses <see cref="BlockingCollection{T}"/> instances. Batches are emitted after a given interval or after they reach a given size, whichever happens first.
/// </summary>
/// <typeparam name="T">The type of objects that the pipeline can process.</typeparam>
class BatchingPipeline<T> : IDisposable
{
private readonly int maxBatchSize;
private readonly TimeSpan maxBatchInterval;
private BlockingCollection<T> inputQueue;
private Task inputBatcherTask;
private BlockingCollection<ICollection<T>> batchQueue;
private Task batchEmitterTask;
private Action<ICollection<T>> emitBatch;
/// <summary>
/// Initializes a new instance of the <see cref="BatchingPipeline{T}"/> class.
/// </summary>
/// <param name="maxBatchSize">The size at which to emit a batch immediately.</param>
/// <param name="maxBatchInterval">The interval after which to emit a batch.</param>
/// <param name="maxBatchBacklog">Size of batch backlog beyond which batches will be discarded.</param>
/// <param name="emitBatch">Action to invoke to emit a batch.</param>
public BatchingPipeline(int maxBatchSize, TimeSpan maxBatchInterval, int maxBatchBacklog, Action<ICollection<T>> emitBatch)
{
this.maxBatchSize = maxBatchSize;
this.maxBatchInterval = maxBatchInterval;
this.inputQueue = new BlockingCollection<T>();
this.batchQueue = new BlockingCollection<ICollection<T>>(maxBatchBacklog);
this.emitBatch = emitBatch;
StartTaskWithRestartContinuation(nameof(inputBatcherTask), () => inputBatcherTask = Task.Factory.StartNew(BatchInput, TaskCreationOptions.LongRunning), () => !inputQueue.IsCompleted);
StartTaskWithRestartContinuation(nameof(batchEmitterTask), () => batchEmitterTask = Task.Factory.StartNew(EmitBatches, TaskCreationOptions.LongRunning), () => !batchQueue.IsCompleted);
}
/// <summary>
/// Enqueue a message to be batched.
/// </summary>
/// <param name="message">The message.</param>
public void Enqueue(T message)
{
inputQueue.Add(message);
Log($"Enqueued message. Backlog: {inputQueue.Count} messages, {batchQueue.Count} batches)");
}
/// <inheritdoc />
public void Dispose()
{
inputQueue.CompleteAdding();
// Consume all messages and errors while disposing
if (inputBatcherTask != null)
{
try
{
inputBatcherTask.Wait();
}
catch (AggregateException)
{
}
inputBatcherTask.Dispose();
}
// BUG: If an error occurs while processing a batch that is not the last, the task will end and
// this method will return. However, the task will then be restarted, meaning tha pipeline
// is not done by the time dispose returns, which could cause issues. Could be resolved by using
// a parent task that handles the restarts.
if (batchEmitterTask != null)
{
try
{
batchEmitterTask.Wait();
}
catch (AggregateException)
{
}
batchEmitterTask.Dispose();
}
}
private void BatchInput()
{
while (!inputQueue.IsCompleted)
{
var batchStartedAt = DateTime.UtcNow;
var batch = new List<T>();
while (!inputQueue.IsCompleted && batch.Count < maxBatchSize && batchStartedAt + maxBatchInterval >= DateTime.UtcNow)
{
if (inputQueue.TryTake(out T input, (batchStartedAt + maxBatchInterval) - DateTime.UtcNow))
{
batch.Add(input);
}
}
if (batch.Count > 0)
{
if (!batchQueue.TryAdd(batch))
{
Log($"Discarded batch of {batch.Count} messages due to unacceptable backlog size ({batchQueue.Count} / {batchQueue.BoundedCapacity} batches of at most {maxBatchSize} messages each).");
}
}
}
batchQueue.CompleteAdding();
Log("Batcher finished.");
}
private void EmitBatches()
{
foreach (var batch in batchQueue.GetConsumingEnumerable())
{
try
{
// NB: Synchronous. A couple of things to consider if making asynchronous:
// - any limit on parallelism - don't want to swamp the backing store if we start falling behind..
// - shutdown should prob be synchronized
Log($"Emitting batch. Backlog: {inputQueue.Count} messages, {batchQueue.Count} batches)");
emitBatch(batch);
Log($"Emitted batch. Backlog: {inputQueue.Count} messages, {batchQueue.Count} batches)");
}
catch (Exception ex)
{
// No back-off or specific handling of bad batches here, but wouldn't be difficult to do.
if (!batchQueue.IsAddingCompleted)
{
// NB: Requeue messages so bad batch doesn't delay others. Means messages will be emitted out of order though.
// Easy to change.. Inner loop, for example. Prob a good idea, esp if have a requirement for back-off.
if (batchQueue.TryAdd(batch))
{
Log($"Re-queued batch of {batch.Count} messages due to emission failure: {ex}");
}
else
{
Log($"Discarded batch of {batch.Count} messages due to emission failure while backlog was unacceptably long.");
}
}
else
{
Log($"Discarded batch of {batch.Count} messages due to emission failure while shutting down.");
}
throw;
}
}
Log("Emitter finished.");
}
private void StartTaskWithRestartContinuation(string name, Func<Task> makeTask, Func<bool> shouldRestart)
{
makeTask().ContinueWith(t =>
{
if (shouldRestart())
{
Log($"{name} encountered an issue. Restarting. {t.Exception}");
StartTaskWithRestartContinuation(name, makeTask, shouldRestart);
}
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment