Last active
September 24, 2017 11:16
-
-
Save sdcondon/5be25120916beb6a27189f0cfb173f13 to your computer and use it in GitHub Desktop.
A simple periodic batching pipeline using BlockingCollections
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
class BatchingPipelineDemo | |
{ | |
// "Environmental" factors | |
private static Random Rnd = new Random(); | |
private const int InputIntervalMsLow = 100; | |
private const int InputIntervalMsHigh = 1900; | |
private const int EmitBatchDelayMsLow = 1000; | |
private const int EmitBatchDelayMsHigh = 4000; | |
private const int EmitBatchFailurePercentage = 0; | |
// Params for Batching Pipeline | |
private const int MaxBatchSize = 3; | |
private const int MaxBatchBacklog = 5; | |
private static TimeSpan MaxBatchInterval = TimeSpan.FromSeconds(3); | |
internal static void Main() | |
{ | |
using (var batcher = new BatchingPipeline<string>(MaxBatchSize, MaxBatchInterval, MaxBatchBacklog, EmitBatch)) | |
{ | |
int i = 0; | |
Timer inputTimer = null; | |
inputTimer = new Timer(o => | |
{ | |
batcher.Enqueue($"Message {i++}"); | |
inputTimer.Change(Rnd.Next(InputIntervalMsLow, InputIntervalMsHigh), int.MaxValue); | |
}, null, 0, int.MaxValue); | |
Console.WriteLine("Press any key to stop the pipeline."); | |
Console.ReadKey(true); | |
Log("Stopping."); | |
inputTimer.Dispose(); | |
} | |
Log("Pipeline stopped. Press any key to exit."); | |
Console.ReadKey(true); | |
} | |
private static void EmitBatch<T>(ICollection<T> messages) | |
{ | |
var sleepFor = Rnd.Next(EmitBatchDelayMsLow, EmitBatchDelayMsHigh); | |
Thread.Sleep(sleepFor); | |
if (Rnd.Next(100) < EmitBatchFailurePercentage) | |
{ | |
throw new InvalidOperationException("KABOOM"); | |
} | |
Log($"Emitting {messages.Count} messages after {sleepFor}ms delay:\r\n\t{string.Join("\r\n\t", messages)}"); | |
} | |
private static void Log(string message) | |
{ | |
Console.WriteLine($"[{DateTime.Now.ToString("HH:mm:ss.fff")}] {message}"); | |
} | |
/// <summary> | |
/// Bathing pipeline logic that uses <see cref="BlockingCollection{T}"/> instances. Batches are emitted after a given interval or after they reach a given size, whichever happens first. | |
/// </summary> | |
/// <typeparam name="T">The type of objects that the pipeline can process.</typeparam> | |
class BatchingPipeline<T> : IDisposable | |
{ | |
private readonly int maxBatchSize; | |
private readonly TimeSpan maxBatchInterval; | |
private BlockingCollection<T> inputQueue; | |
private Task inputBatcherTask; | |
private BlockingCollection<ICollection<T>> batchQueue; | |
private Task batchEmitterTask; | |
private Action<ICollection<T>> emitBatch; | |
/// <summary> | |
/// Initializes a new instance of the <see cref="BatchingPipeline{T}"/> class. | |
/// </summary> | |
/// <param name="maxBatchSize">The size at which to emit a batch immediately.</param> | |
/// <param name="maxBatchInterval">The interval after which to emit a batch.</param> | |
/// <param name="maxBatchBacklog">Size of batch backlog beyond which batches will be discarded.</param> | |
/// <param name="emitBatch">Action to invoke to emit a batch.</param> | |
public BatchingPipeline(int maxBatchSize, TimeSpan maxBatchInterval, int maxBatchBacklog, Action<ICollection<T>> emitBatch) | |
{ | |
this.maxBatchSize = maxBatchSize; | |
this.maxBatchInterval = maxBatchInterval; | |
this.inputQueue = new BlockingCollection<T>(); | |
this.batchQueue = new BlockingCollection<ICollection<T>>(maxBatchBacklog); | |
this.emitBatch = emitBatch; | |
StartTaskWithRestartContinuation(nameof(inputBatcherTask), () => inputBatcherTask = Task.Factory.StartNew(BatchInput, TaskCreationOptions.LongRunning), () => !inputQueue.IsCompleted); | |
StartTaskWithRestartContinuation(nameof(batchEmitterTask), () => batchEmitterTask = Task.Factory.StartNew(EmitBatches, TaskCreationOptions.LongRunning), () => !batchQueue.IsCompleted); | |
} | |
/// <summary> | |
/// Enqueue a message to be batched. | |
/// </summary> | |
/// <param name="message">The message.</param> | |
public void Enqueue(T message) | |
{ | |
inputQueue.Add(message); | |
Log($"Enqueued message. Backlog: {inputQueue.Count} messages, {batchQueue.Count} batches)"); | |
} | |
/// <inheritdoc /> | |
public void Dispose() | |
{ | |
inputQueue.CompleteAdding(); | |
// Consume all messages and errors while disposing | |
if (inputBatcherTask != null) | |
{ | |
try | |
{ | |
inputBatcherTask.Wait(); | |
} | |
catch (AggregateException) | |
{ | |
} | |
inputBatcherTask.Dispose(); | |
} | |
// BUG: If an error occurs while processing a batch that is not the last, the task will end and | |
// this method will return. However, the task will then be restarted, meaning tha pipeline | |
// is not done by the time dispose returns, which could cause issues. Could be resolved by using | |
// a parent task that handles the restarts. | |
if (batchEmitterTask != null) | |
{ | |
try | |
{ | |
batchEmitterTask.Wait(); | |
} | |
catch (AggregateException) | |
{ | |
} | |
batchEmitterTask.Dispose(); | |
} | |
} | |
private void BatchInput() | |
{ | |
while (!inputQueue.IsCompleted) | |
{ | |
var batchStartedAt = DateTime.UtcNow; | |
var batch = new List<T>(); | |
while (!inputQueue.IsCompleted && batch.Count < maxBatchSize && batchStartedAt + maxBatchInterval >= DateTime.UtcNow) | |
{ | |
if (inputQueue.TryTake(out T input, (batchStartedAt + maxBatchInterval) - DateTime.UtcNow)) | |
{ | |
batch.Add(input); | |
} | |
} | |
if (batch.Count > 0) | |
{ | |
if (!batchQueue.TryAdd(batch)) | |
{ | |
Log($"Discarded batch of {batch.Count} messages due to unacceptable backlog size ({batchQueue.Count} / {batchQueue.BoundedCapacity} batches of at most {maxBatchSize} messages each)."); | |
} | |
} | |
} | |
batchQueue.CompleteAdding(); | |
Log("Batcher finished."); | |
} | |
private void EmitBatches() | |
{ | |
foreach (var batch in batchQueue.GetConsumingEnumerable()) | |
{ | |
try | |
{ | |
// NB: Synchronous. A couple of things to consider if making asynchronous: | |
// - any limit on parallelism - don't want to swamp the backing store if we start falling behind.. | |
// - shutdown should prob be synchronized | |
Log($"Emitting batch. Backlog: {inputQueue.Count} messages, {batchQueue.Count} batches)"); | |
emitBatch(batch); | |
Log($"Emitted batch. Backlog: {inputQueue.Count} messages, {batchQueue.Count} batches)"); | |
} | |
catch (Exception ex) | |
{ | |
// No back-off or specific handling of bad batches here, but wouldn't be difficult to do. | |
if (!batchQueue.IsAddingCompleted) | |
{ | |
// NB: Requeue messages so bad batch doesn't delay others. Means messages will be emitted out of order though. | |
// Easy to change.. Inner loop, for example. Prob a good idea, esp if have a requirement for back-off. | |
if (batchQueue.TryAdd(batch)) | |
{ | |
Log($"Re-queued batch of {batch.Count} messages due to emission failure: {ex}"); | |
} | |
else | |
{ | |
Log($"Discarded batch of {batch.Count} messages due to emission failure while backlog was unacceptably long."); | |
} | |
} | |
else | |
{ | |
Log($"Discarded batch of {batch.Count} messages due to emission failure while shutting down."); | |
} | |
throw; | |
} | |
} | |
Log("Emitter finished."); | |
} | |
private void StartTaskWithRestartContinuation(string name, Func<Task> makeTask, Func<bool> shouldRestart) | |
{ | |
makeTask().ContinueWith(t => | |
{ | |
if (shouldRestart()) | |
{ | |
Log($"{name} encountered an issue. Restarting. {t.Exception}"); | |
StartTaskWithRestartContinuation(name, makeTask, shouldRestart); | |
} | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment