Skip to content

Instantly share code, notes, and snippets.

@jezzsantos
Created July 14, 2016 09:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jezzsantos/44bf844b27bebafcb64d9947e1896649 to your computer and use it in GitHub Desktop.
Save jezzsantos/44bf844b27bebafcb64d9947e1896649 to your computer and use it in GitHub Desktop.
A Serilog.ILogEventSink that buffers the input in an in a bounded memory queue, and has a thread pool thread write the event to another sink.
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
using ServiceStack;
namespace Gene.Diagnostics
{
/// <summary>
/// Provides a continous buffered producer/consumer queue (of specified buffer size) of the specified
/// <see cref="TMessage" />.
/// The producer thread will be async awaited for the consumer, when the number of messages reaches the specified buffer size, to prevent filling up memory.
/// </summary>
public class BufferedQueue<TMessage> : IBufferedQueue<TMessage>
{
private const int DefaultQueueSize = 50;
private BufferBlock<TMessage> queue;
public BufferedQueue()
: this(0)
{
}
public BufferedQueue(int size)
{
Size = ((size > 0) ? size : DefaultQueueSize);
this.queue = new BufferBlock<TMessage>(new DataflowBlockOptions
{
BoundedCapacity = Size,
});
}
public IRecorder Recorder { get; set; }
public System.Threading.Tasks.Task IsComplete
{
get { return this.queue.Completion; }
}
internal int Count
{
get { return this.queue.Count; }
}
public int Size { get; private set; }
public async System.Threading.Tasks.Task ProduceAsync(TMessage message)
{
await this.queue.SendAsync(message);
}
public async System.Threading.Tasks.Task ConsumeAsync(Action<TMessage> action)
{
await ConsumeAsync(action, CancellationToken.None);
}
public async System.Threading.Tasks.Task ConsumeAsync(Action<TMessage> action, CancellationToken cancellation)
{
while (await this.queue.OutputAvailableAsync(cancellation))
{
var message = await this.queue.ReceiveAsync(cancellation);
try
{
if (Recorder != null)
{
Recorder.TraceWithinScope(TraceLevel.Verbose, "BufferedQueue.ConsumeAsync", "Consuming {0}".Fmt(message), () =>
{
action(message);
});
}
else
{
action(message);
}
}
catch (Exception ex)
{
if (Recorder != null)
{
Recorder.Crash("BufferedQueue.ConsumeAsync", CrashLevel.Error, ex);
}
//Ignore exception and continue
}
}
}
public void Complete()
{
this.queue.Complete();
}
}
}
using System;
using Serilog.Core;
using Serilog.Events;
namespace Gene.Diagnostics
{
public class BufferedQueueSink : ILogEventSink
{
private int bufferSize;
private IBufferedQueue<LogEvent> queue;
private ILogEventSink sink;
public BufferedQueueSink(ILogEventSink sink)
: this(sink, 0)
{
}
public BufferedQueueSink(ILogEventSink sink, int bufferSize)
{
Guard.AgainstNull(() => sink, sink);
this.sink = sink;
this.bufferSize = bufferSize;
}
public IRecorder Recorder { get; set; }
public bool ConsumerStarted { get; private set; }
public async void Emit(LogEvent logEvent)
{
EnsureConsumerStarted();
await this.queue.ProduceAsync(logEvent);
}
private void EnsureConsumerStarted()
{
if (ConsumerStarted)
{
return;
}
ConsumerStarted = true;
this.queue = new BufferedQueue<LogEvent>(this.bufferSize)
{
Recorder = Recorder,
};
StartConsumer(logEvent => this.sink.Emit(logEvent));
}
//TODO: To save on multiple threads in the thread pool, move this method to super class,
//that can execute a single thread and consume all instances of this sink in same Task.Run()
//See: Concurrency book, where he runs several tasks concurrently
public void StartConsumer(Action<LogEvent> action)
{
System.Threading.Tasks.Task.Run(async () =>
{
while (true)
{
try
{
await this.queue.ConsumeAsync(action);
}
catch (Exception ex)
{
if (Recorder != null)
{
Recorder.Crash("BufferedQueueSink.Consume", CrashLevel.Error, ex);
}
//Ignore exception and continue
}
}
});
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using Gene.Diagnostics;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Serilog.Core;
using Serilog.Events;
using Serilog.Parsing;
using ServiceStack;
namespace Gene.UnitTests.Diagnostics
{
public class BufferedQueueSinkSpec
{
private static readonly IAssertion Assert = new Assertion();
[TestClass]
public class GivenAContext
{
private BufferedQueueSink sink;
private Mock<ILogEventSink> innerSink;
private Mock<IRecorder> recorder;
[TestInitialize]
public void Initialize()
{
this.recorder = new Mock<IRecorder>();
this.recorder.Setup(rec => rec.TraceWithinScope(It.IsAny<TraceLevel>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Action>()))
.Callback((TraceLevel level, string source, string scope, Action action) =>
{
action();
});
this.innerSink = new Mock<ILogEventSink>();
this.innerSink.Setup(s => s.Emit(It.IsAny<LogEvent>()))
.Callback((LogEvent le) =>
{
});
this.sink = new BufferedQueueSink(this.innerSink.Object)
{
Recorder = this.recorder.Object,
};
}
[TestMethod, TestCategory("Unit")]
public void WhenCtorWithNullSink_ThenThrows()
{
Assert.Throws<ArgumentNullException>(() =>
new BufferedQueueSink(null));
}
[TestMethod, TestCategory("Unit")]
public void WhenEmitAndNotStarted_ThenConsumerStarted()
{
this.sink.Emit(CreateEvent());
Assert.True(this.sink.ConsumerStarted);
}
[TestMethod, TestCategory("Unit")]
public async System.Threading.Tasks.Task WhenEmitSingle_ThenRelaysToInnerSink()
{
var logEvent = CreateEvent();
this.sink.Emit(logEvent);
await System.Threading.Tasks.Task.Delay(TimeSpan.FromSeconds(5));
this.innerSink.Verify(s => s.Emit(logEvent), Times.Once);
this.recorder.Verify(rec => rec.Crash(It.IsAny<string>(), It.IsAny<CrashLevel>(), It.IsAny<Exception>()), Times.Never);
}
[TestMethod, TestCategory("Unit")]
public async System.Threading.Tasks.Task WhenInnerEmitThrows_ThenContinuesRelaysToInnerSink()
{
var exception = new Exception();
this.innerSink.Setup(s => s.Emit(It.IsAny<LogEvent>()))
.Throws(exception);
var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent(),
};
events.Each(e => this.sink.Emit(e));
await System.Threading.Tasks.Task.Delay(TimeSpan.FromSeconds(5));
events.Each(e => this.innerSink.Verify(s => s.Emit(e), Times.Once));
this.recorder.Verify(rec => rec.Crash(It.IsAny<string>(), CrashLevel.Error, exception), Times.Exactly(3));
}
[TestMethod, TestCategory("Unit")]
public async System.Threading.Tasks.Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
{
var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent(),
};
events.ForEach(e =>
{
this.sink.Emit(e);
});
await System.Threading.Tasks.Task.Delay(TimeSpan.FromSeconds(5));
this.innerSink.Verify(s => s.Emit(It.IsAny<LogEvent>()), Times.Exactly(3));
events.ForEach(e => this.innerSink.Verify(s => s.Emit(e), Times.Once));
this.recorder.Verify(rec => rec.Crash(It.IsAny<string>(), It.IsAny<CrashLevel>(), It.IsAny<Exception>()), Times.Never);
}
private static LogEvent CreateEvent()
{
return new LogEvent(DateTimeOffset.MaxValue, LogEventLevel.Verbose, null, new MessageTemplate("amessage", Enumerable.Empty<MessageTemplateToken>()),
Enumerable.Empty<LogEventProperty>());
}
}
public class TestMessage
{
}
public class TestSink : ILogEventSink
{
public void Emit(LogEvent logEvent)
{
}
}
}
}
using System;
using System.Collections.Generic;
using Gene.Diagnostics;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using ServiceStack;
namespace Gene.UnitTests.Diagnostics
{
public class BufferedQueueSpec
{
private static readonly IAssertion Assert = new Assertion();
public interface IRanAction<in TMessage>
{
void Ran(TMessage message);
}
[TestClass]
public class GivenAContext
{
private BufferedQueue<TestMessage> queue;
private Mock<IRecorder> recorder;
private Mock<IRanAction<TestMessage>> ranner;
[TestInitialize]
public void Initialize()
{
this.recorder = new Mock<IRecorder>();
this.recorder.Setup(rec => rec.TraceWithinScope(It.IsAny<TraceLevel>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Action>()))
.Callback((TraceLevel level, string source, string scope, Action action) =>
{
action();
});
this.ranner = new Mock<IRanAction<TestMessage>>();
this.queue = new BufferedQueue<TestMessage>(5)
{
Recorder = this.recorder.Object,
};
}
[TestMethod, TestCategory("Unit")]
public async System.Threading.Tasks.Task WhenProduceBatchOfOne_ThenReturnsProduced()
{
await this.queue.ProduceAsync(new TestMessage());
Assert.Equal(1, this.queue.Count);
}
[TestMethod, TestCategory("Unit")]
public void WhenProduceBatch_ThenReturnsProduced()
{
var messages = new List<TestMessage>
{
new TestMessage(),
new TestMessage(),
new TestMessage(),
};
messages.ForEach(async msg => await this.queue.ProduceAsync(msg));
Assert.Equal(3, this.queue.Count);
}
[TestMethod, TestCategory("Unit")]
public async System.Threading.Tasks.Task WhenConsumeAsyncAndNoMessages_ThenConsumesNone()
{
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg));
this.queue.Complete();
await consumer;
this.ranner.Verify(r => r.Ran(It.IsAny<TestMessage>()), Times.Never);
}
[TestMethod, TestCategory("Unit")]
public async System.Threading.Tasks.Task WhenConsumeAsyncAndActionThrows_ThenConsumesAllProduced()
{
var messages = new List<TestMessage>
{
new TestMessage(),
new TestMessage(),
new TestMessage(),
};
var exception = new Exception();
this.ranner.Setup(r => r.Ran(It.IsAny<TestMessage>()))
.Throws(exception);
messages.ForEach(async msg => await this.queue.ProduceAsync(msg));
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg));
this.queue.Complete();
await System.Threading.Tasks.Task.WhenAll(consumer, this.queue.IsComplete);
messages.Each(msg =>
{
this.ranner.Verify(r => r.Ran(msg), Times.Once);
});
this.recorder.Verify(rec => rec.Crash(It.IsAny<string>(), CrashLevel.Error, exception), Times.Exactly(3));
}
[TestMethod, TestCategory("Unit")]
public async System.Threading.Tasks.Task WhenConsumeAsyncWithProducedBatchOfOne_ThenConsumesAllProduced()
{
var message = new TestMessage();
await this.queue.ProduceAsync(message);
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg));
this.queue.Complete();
await System.Threading.Tasks.Task.WhenAll(consumer, this.queue.IsComplete);
this.ranner.Verify(r => r.Ran(message), Times.Once);
}
[TestMethod, TestCategory("Unit")]
public async System.Threading.Tasks.Task WhenConsumeAsyncWithProducedBatchSmallerThanBuffer_ThenConsumesAllProduced()
{
var messages = CreateMessages(3);
messages.ForEach(async msg => await this.queue.ProduceAsync(msg));
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg));
this.queue.Complete();
await System.Threading.Tasks.Task.WhenAll(consumer, this.queue.IsComplete);
messages.Each(msg =>
{
this.ranner.Verify(r => r.Ran(msg), Times.Once);
});
}
[TestMethod, TestCategory("Unit")]
public async System.Threading.Tasks.Task WhenConsumeAsyncWithProducedBatchSizeOfBuffer_ThenConsumesAllProduced()
{
var messages = CreateMessages(this.queue.Size);
messages.ForEach(async msg => await this.queue.ProduceAsync(msg));
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg));
this.queue.Complete();
await System.Threading.Tasks.Task.WhenAll(consumer, this.queue.IsComplete);
messages.Each(msg =>
{
this.ranner.Verify(r => r.Ran(msg), Times.Once);
});
}
[TestMethod, TestCategory("Unit")]
public async System.Threading.Tasks.Task WhenConsumeAsyncWithProducedBatchLargerThanBuffer_ThenConsumesAllProduced()
{
var messages = CreateMessages(this.queue.Size + 1);
messages.ForEach(async msg => await this.queue.ProduceAsync(msg));
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg));
this.queue.Complete();
await System.Threading.Tasks.Task.WhenAll(consumer, this.queue.IsComplete);
messages.Each(msg =>
{
this.ranner.Verify(r => r.Ran(msg), Times.Once);
});
}
private static List<TestMessage> CreateMessages(int count)
{
var messages = new List<TestMessage>();
Repeat.This(() =>
{
messages.Add(new TestMessage());
}, count);
return messages;
}
}
public class TestMessage
{
}
}
}
@jezzsantos
Copy link
Author

jezzsantos commented Jul 14, 2016

These are results from a performance test using the BufferedQueueSink sink in desktop environment, where the delegated sink DataStoreSink writes the log event to a db store.

this.logger = new LoggerConfiguration()
                .WriteTo.Sink(new BufferedQueueSink(new DataStoreSink()))
                .CreateLogger();

TestResults

when BufferedQueueSink is not used, and DataStoreSink is used directly:

  • the 'Time To Load' is the time the logging thread spends calling ILog.* method,
  • the 'Time To Process' is the time the logging thread spends writing the event to the sink.

when BufferedQueueSink is used, and relays to the DataStoreSink, then:

  • the 'Time To Load' is the time the logging thread spends calling ILog.* method.
  • the 'Time To Process' is the time the [thread pool] thread spends writing to the sink.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment