Created
July 14, 2016 09:39
-
-
Save jezzsantos/44bf844b27bebafcb64d9947e1896649 to your computer and use it in GitHub Desktop.
A Serilog.ILogEventSink that buffers the input in an in a bounded memory queue, and has a thread pool thread write the event to another sink.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks.Dataflow; | |
using ServiceStack; | |
namespace Gene.Diagnostics | |
{ | |
/// <summary> | |
/// Provides a continous buffered producer/consumer queue (of specified buffer size) of the specified | |
/// <see cref="TMessage" />. | |
/// The producer thread will be async awaited for the consumer, when the number of messages reaches the specified buffer size, to prevent filling up memory. | |
/// </summary> | |
public class BufferedQueue<TMessage> : IBufferedQueue<TMessage> | |
{ | |
private const int DefaultQueueSize = 50; | |
private BufferBlock<TMessage> queue; | |
public BufferedQueue() | |
: this(0) | |
{ | |
} | |
public BufferedQueue(int size) | |
{ | |
Size = ((size > 0) ? size : DefaultQueueSize); | |
this.queue = new BufferBlock<TMessage>(new DataflowBlockOptions | |
{ | |
BoundedCapacity = Size, | |
}); | |
} | |
public IRecorder Recorder { get; set; } | |
public System.Threading.Tasks.Task IsComplete | |
{ | |
get { return this.queue.Completion; } | |
} | |
internal int Count | |
{ | |
get { return this.queue.Count; } | |
} | |
public int Size { get; private set; } | |
public async System.Threading.Tasks.Task ProduceAsync(TMessage message) | |
{ | |
await this.queue.SendAsync(message); | |
} | |
public async System.Threading.Tasks.Task ConsumeAsync(Action<TMessage> action) | |
{ | |
await ConsumeAsync(action, CancellationToken.None); | |
} | |
public async System.Threading.Tasks.Task ConsumeAsync(Action<TMessage> action, CancellationToken cancellation) | |
{ | |
while (await this.queue.OutputAvailableAsync(cancellation)) | |
{ | |
var message = await this.queue.ReceiveAsync(cancellation); | |
try | |
{ | |
if (Recorder != null) | |
{ | |
Recorder.TraceWithinScope(TraceLevel.Verbose, "BufferedQueue.ConsumeAsync", "Consuming {0}".Fmt(message), () => | |
{ | |
action(message); | |
}); | |
} | |
else | |
{ | |
action(message); | |
} | |
} | |
catch (Exception ex) | |
{ | |
if (Recorder != null) | |
{ | |
Recorder.Crash("BufferedQueue.ConsumeAsync", CrashLevel.Error, ex); | |
} | |
//Ignore exception and continue | |
} | |
} | |
} | |
public void Complete() | |
{ | |
this.queue.Complete(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using Serilog.Core; | |
using Serilog.Events; | |
namespace Gene.Diagnostics | |
{ | |
public class BufferedQueueSink : ILogEventSink | |
{ | |
private int bufferSize; | |
private IBufferedQueue<LogEvent> queue; | |
private ILogEventSink sink; | |
public BufferedQueueSink(ILogEventSink sink) | |
: this(sink, 0) | |
{ | |
} | |
public BufferedQueueSink(ILogEventSink sink, int bufferSize) | |
{ | |
Guard.AgainstNull(() => sink, sink); | |
this.sink = sink; | |
this.bufferSize = bufferSize; | |
} | |
public IRecorder Recorder { get; set; } | |
public bool ConsumerStarted { get; private set; } | |
public async void Emit(LogEvent logEvent) | |
{ | |
EnsureConsumerStarted(); | |
await this.queue.ProduceAsync(logEvent); | |
} | |
private void EnsureConsumerStarted() | |
{ | |
if (ConsumerStarted) | |
{ | |
return; | |
} | |
ConsumerStarted = true; | |
this.queue = new BufferedQueue<LogEvent>(this.bufferSize) | |
{ | |
Recorder = Recorder, | |
}; | |
StartConsumer(logEvent => this.sink.Emit(logEvent)); | |
} | |
//TODO: To save on multiple threads in the thread pool, move this method to super class, | |
//that can execute a single thread and consume all instances of this sink in same Task.Run() | |
//See: Concurrency book, where he runs several tasks concurrently | |
public void StartConsumer(Action<LogEvent> action) | |
{ | |
System.Threading.Tasks.Task.Run(async () => | |
{ | |
while (true) | |
{ | |
try | |
{ | |
await this.queue.ConsumeAsync(action); | |
} | |
catch (Exception ex) | |
{ | |
if (Recorder != null) | |
{ | |
Recorder.Crash("BufferedQueueSink.Consume", CrashLevel.Error, ex); | |
} | |
//Ignore exception and continue | |
} | |
} | |
}); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using Gene.Diagnostics; | |
using Microsoft.VisualStudio.TestTools.UnitTesting; | |
using Moq; | |
using Serilog.Core; | |
using Serilog.Events; | |
using Serilog.Parsing; | |
using ServiceStack; | |
namespace Gene.UnitTests.Diagnostics | |
{ | |
public class BufferedQueueSinkSpec | |
{ | |
private static readonly IAssertion Assert = new Assertion(); | |
[TestClass] | |
public class GivenAContext | |
{ | |
private BufferedQueueSink sink; | |
private Mock<ILogEventSink> innerSink; | |
private Mock<IRecorder> recorder; | |
[TestInitialize] | |
public void Initialize() | |
{ | |
this.recorder = new Mock<IRecorder>(); | |
this.recorder.Setup(rec => rec.TraceWithinScope(It.IsAny<TraceLevel>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Action>())) | |
.Callback((TraceLevel level, string source, string scope, Action action) => | |
{ | |
action(); | |
}); | |
this.innerSink = new Mock<ILogEventSink>(); | |
this.innerSink.Setup(s => s.Emit(It.IsAny<LogEvent>())) | |
.Callback((LogEvent le) => | |
{ | |
}); | |
this.sink = new BufferedQueueSink(this.innerSink.Object) | |
{ | |
Recorder = this.recorder.Object, | |
}; | |
} | |
[TestMethod, TestCategory("Unit")] | |
public void WhenCtorWithNullSink_ThenThrows() | |
{ | |
Assert.Throws<ArgumentNullException>(() => | |
new BufferedQueueSink(null)); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public void WhenEmitAndNotStarted_ThenConsumerStarted() | |
{ | |
this.sink.Emit(CreateEvent()); | |
Assert.True(this.sink.ConsumerStarted); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public async System.Threading.Tasks.Task WhenEmitSingle_ThenRelaysToInnerSink() | |
{ | |
var logEvent = CreateEvent(); | |
this.sink.Emit(logEvent); | |
await System.Threading.Tasks.Task.Delay(TimeSpan.FromSeconds(5)); | |
this.innerSink.Verify(s => s.Emit(logEvent), Times.Once); | |
this.recorder.Verify(rec => rec.Crash(It.IsAny<string>(), It.IsAny<CrashLevel>(), It.IsAny<Exception>()), Times.Never); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public async System.Threading.Tasks.Task WhenInnerEmitThrows_ThenContinuesRelaysToInnerSink() | |
{ | |
var exception = new Exception(); | |
this.innerSink.Setup(s => s.Emit(It.IsAny<LogEvent>())) | |
.Throws(exception); | |
var events = new List<LogEvent> | |
{ | |
CreateEvent(), | |
CreateEvent(), | |
CreateEvent(), | |
}; | |
events.Each(e => this.sink.Emit(e)); | |
await System.Threading.Tasks.Task.Delay(TimeSpan.FromSeconds(5)); | |
events.Each(e => this.innerSink.Verify(s => s.Emit(e), Times.Once)); | |
this.recorder.Verify(rec => rec.Crash(It.IsAny<string>(), CrashLevel.Error, exception), Times.Exactly(3)); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public async System.Threading.Tasks.Task WhenEmitMultipleTimes_ThenRelaysToInnerSink() | |
{ | |
var events = new List<LogEvent> | |
{ | |
CreateEvent(), | |
CreateEvent(), | |
CreateEvent(), | |
}; | |
events.ForEach(e => | |
{ | |
this.sink.Emit(e); | |
}); | |
await System.Threading.Tasks.Task.Delay(TimeSpan.FromSeconds(5)); | |
this.innerSink.Verify(s => s.Emit(It.IsAny<LogEvent>()), Times.Exactly(3)); | |
events.ForEach(e => this.innerSink.Verify(s => s.Emit(e), Times.Once)); | |
this.recorder.Verify(rec => rec.Crash(It.IsAny<string>(), It.IsAny<CrashLevel>(), It.IsAny<Exception>()), Times.Never); | |
} | |
private static LogEvent CreateEvent() | |
{ | |
return new LogEvent(DateTimeOffset.MaxValue, LogEventLevel.Verbose, null, new MessageTemplate("amessage", Enumerable.Empty<MessageTemplateToken>()), | |
Enumerable.Empty<LogEventProperty>()); | |
} | |
} | |
public class TestMessage | |
{ | |
} | |
public class TestSink : ILogEventSink | |
{ | |
public void Emit(LogEvent logEvent) | |
{ | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using Gene.Diagnostics; | |
using Microsoft.VisualStudio.TestTools.UnitTesting; | |
using Moq; | |
using ServiceStack; | |
namespace Gene.UnitTests.Diagnostics | |
{ | |
public class BufferedQueueSpec | |
{ | |
private static readonly IAssertion Assert = new Assertion(); | |
public interface IRanAction<in TMessage> | |
{ | |
void Ran(TMessage message); | |
} | |
[TestClass] | |
public class GivenAContext | |
{ | |
private BufferedQueue<TestMessage> queue; | |
private Mock<IRecorder> recorder; | |
private Mock<IRanAction<TestMessage>> ranner; | |
[TestInitialize] | |
public void Initialize() | |
{ | |
this.recorder = new Mock<IRecorder>(); | |
this.recorder.Setup(rec => rec.TraceWithinScope(It.IsAny<TraceLevel>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Action>())) | |
.Callback((TraceLevel level, string source, string scope, Action action) => | |
{ | |
action(); | |
}); | |
this.ranner = new Mock<IRanAction<TestMessage>>(); | |
this.queue = new BufferedQueue<TestMessage>(5) | |
{ | |
Recorder = this.recorder.Object, | |
}; | |
} | |
[TestMethod, TestCategory("Unit")] | |
public async System.Threading.Tasks.Task WhenProduceBatchOfOne_ThenReturnsProduced() | |
{ | |
await this.queue.ProduceAsync(new TestMessage()); | |
Assert.Equal(1, this.queue.Count); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public void WhenProduceBatch_ThenReturnsProduced() | |
{ | |
var messages = new List<TestMessage> | |
{ | |
new TestMessage(), | |
new TestMessage(), | |
new TestMessage(), | |
}; | |
messages.ForEach(async msg => await this.queue.ProduceAsync(msg)); | |
Assert.Equal(3, this.queue.Count); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public async System.Threading.Tasks.Task WhenConsumeAsyncAndNoMessages_ThenConsumesNone() | |
{ | |
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg)); | |
this.queue.Complete(); | |
await consumer; | |
this.ranner.Verify(r => r.Ran(It.IsAny<TestMessage>()), Times.Never); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public async System.Threading.Tasks.Task WhenConsumeAsyncAndActionThrows_ThenConsumesAllProduced() | |
{ | |
var messages = new List<TestMessage> | |
{ | |
new TestMessage(), | |
new TestMessage(), | |
new TestMessage(), | |
}; | |
var exception = new Exception(); | |
this.ranner.Setup(r => r.Ran(It.IsAny<TestMessage>())) | |
.Throws(exception); | |
messages.ForEach(async msg => await this.queue.ProduceAsync(msg)); | |
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg)); | |
this.queue.Complete(); | |
await System.Threading.Tasks.Task.WhenAll(consumer, this.queue.IsComplete); | |
messages.Each(msg => | |
{ | |
this.ranner.Verify(r => r.Ran(msg), Times.Once); | |
}); | |
this.recorder.Verify(rec => rec.Crash(It.IsAny<string>(), CrashLevel.Error, exception), Times.Exactly(3)); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public async System.Threading.Tasks.Task WhenConsumeAsyncWithProducedBatchOfOne_ThenConsumesAllProduced() | |
{ | |
var message = new TestMessage(); | |
await this.queue.ProduceAsync(message); | |
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg)); | |
this.queue.Complete(); | |
await System.Threading.Tasks.Task.WhenAll(consumer, this.queue.IsComplete); | |
this.ranner.Verify(r => r.Ran(message), Times.Once); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public async System.Threading.Tasks.Task WhenConsumeAsyncWithProducedBatchSmallerThanBuffer_ThenConsumesAllProduced() | |
{ | |
var messages = CreateMessages(3); | |
messages.ForEach(async msg => await this.queue.ProduceAsync(msg)); | |
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg)); | |
this.queue.Complete(); | |
await System.Threading.Tasks.Task.WhenAll(consumer, this.queue.IsComplete); | |
messages.Each(msg => | |
{ | |
this.ranner.Verify(r => r.Ran(msg), Times.Once); | |
}); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public async System.Threading.Tasks.Task WhenConsumeAsyncWithProducedBatchSizeOfBuffer_ThenConsumesAllProduced() | |
{ | |
var messages = CreateMessages(this.queue.Size); | |
messages.ForEach(async msg => await this.queue.ProduceAsync(msg)); | |
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg)); | |
this.queue.Complete(); | |
await System.Threading.Tasks.Task.WhenAll(consumer, this.queue.IsComplete); | |
messages.Each(msg => | |
{ | |
this.ranner.Verify(r => r.Ran(msg), Times.Once); | |
}); | |
} | |
[TestMethod, TestCategory("Unit")] | |
public async System.Threading.Tasks.Task WhenConsumeAsyncWithProducedBatchLargerThanBuffer_ThenConsumesAllProduced() | |
{ | |
var messages = CreateMessages(this.queue.Size + 1); | |
messages.ForEach(async msg => await this.queue.ProduceAsync(msg)); | |
var consumer = this.queue.ConsumeAsync(msg => this.ranner.Object.Ran(msg)); | |
this.queue.Complete(); | |
await System.Threading.Tasks.Task.WhenAll(consumer, this.queue.IsComplete); | |
messages.Each(msg => | |
{ | |
this.ranner.Verify(r => r.Ran(msg), Times.Once); | |
}); | |
} | |
private static List<TestMessage> CreateMessages(int count) | |
{ | |
var messages = new List<TestMessage>(); | |
Repeat.This(() => | |
{ | |
messages.Add(new TestMessage()); | |
}, count); | |
return messages; | |
} | |
} | |
public class TestMessage | |
{ | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
These are results from a performance test using the
BufferedQueueSink
sink in desktop environment, where the delegated sinkDataStoreSink
writes the log event to a db store.TestResults
when
BufferedQueueSink
is not used, and DataStoreSink is used directly:when
BufferedQueueSink
is used, and relays to theDataStoreSink
, then: