Skip to content

Instantly share code, notes, and snippets.

@in-async
Last active August 2, 2023 13:11
Show Gist options
  • Save in-async/e5a20741ec66fc81836aa88965432165 to your computer and use it in GitHub Desktop.
Save in-async/e5a20741ec66fc81836aa88965432165 to your computer and use it in GitHub Desktop.
ログを外部リポジトリに書き込む為のクラス。
using System.Threading.Channels;
namespace Commons;
public interface ITypedLogger<T> {
void Log(T log);
}
public abstract class TypedLogger<T> : ITypedLogger<T>, IDisposable, IAsyncDisposable {
private readonly ChannelWriter<T> _writer;
private readonly Task _consumeTask;
private readonly CancellationTokenSource _sts = new();
private readonly ILogger<TypedLogger<T>>? _logger;
public TypedLogger(ILogger<TypedLogger<T>>? logger = null) {
_logger = logger;
var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions {
SingleWriter = false,
SingleReader = true,
});
_writer = channel.Writer;
_consumeTask = Task.Run(async () => {
try {
await ConsumeAsync(channel.Reader, _sts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException ex) when (ex.CancellationToken == _sts.Token) { }
catch (Exception ex) {
logger?.LogError(ex, null);
}
});
}
public TimeSpan ChargeTime { get; init; } = TimeSpan.FromSeconds(1);
~TypedLogger() {
Dispose(disposing: false);
}
public void Dispose() {
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
public async ValueTask DisposeAsync() {
await DisposeAsyncCore().ConfigureAwait(false);
Dispose(disposing: false);
GC.SuppressFinalize(this);
}
private int _disposed;
protected virtual void Dispose(bool disposing) {
DisposeAsyncCore().AsTask().GetAwaiter().GetResult();
}
protected virtual ValueTask DisposeAsyncCore() {
if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 0) {
_writer.TryComplete();
try {
_sts.Cancel();
}
catch (AggregateException ex) {
_logger?.LogError(ex, "An error occurred when stopping.\n{ErrorMessage}", ex.Message);
}
finally {
_sts.Dispose();
}
}
return new ValueTask(_consumeTask);
}
public void Log(T log) {
if (!_writer.TryWrite(log)) {
_logger?.LogError("Failed to write log to channel.");
}
}
private async Task ConsumeAsync(ChannelReader<T> reader, CancellationToken stoppingToken) {
while (await reader.WaitToReadAsync(stoppingToken).ConfigureAwait(false)) {
while (reader.TryRead(out T? log)) {
await WriteAsync(log, stoppingToken).ConfigureAwait(false);
}
await FlushAsync(stoppingToken).ConfigureAwait(false);
await Task.Delay(ChargeTime, stoppingToken).ConfigureAwait(false);
}
}
protected abstract ValueTask WriteAsync(T log, CancellationToken token);
protected abstract Task FlushAsync(CancellationToken token);
}
using Amazon.KinesisFirehose;
using Amazon.KinesisFirehose.Model;
using Polly;
using Polly.Contrib.WaitAndRetry;
using Polly.Retry;
namespace Commons;
public class FirehoseTypedLogger<T> : TypedLogger<T> {
private readonly IAmazonKinesisFirehose _firehose;
private readonly string _streamName;
private readonly Action<MemoryStream, T> _serialize;
private readonly ILogger<FirehoseTypedLogger<T>>? _logger;
private readonly Lazy<AsyncRetryPolicy> _retryPolicy;
public FirehoseTypedLogger(IAmazonKinesisFirehose firehose, string streamName, Action<MemoryStream, T> serialize, ILogger<FirehoseTypedLogger<T>>? logger = null)
: base(logger) {
_firehose = firehose ?? throw new ArgumentNullException(nameof(firehose));
_streamName = streamName ?? throw new ArgumentNullException(nameof(streamName));
_serialize = serialize ?? throw new ArgumentNullException(nameof(serialize));
_logger = logger;
_retryPolicy = new(() => Policy
.Handle<ServiceUnavailableException>()
.WaitAndRetryAsync(Backoff.DecorrelatedJitterBackoffV2(medianFirstRetryDelay: TimeSpan.FromSeconds(1), RetryCount))
);
ChargeTime = TimeSpan.FromSeconds(5);
}
// NOTE: https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html
public int BufferLogs { get; init; } = 500;
public long BufferBytes { get; init; } = 4194304; // 4 MiB
public int RetryCount { get; init; } = 5;
private readonly List<byte[]> _buffer = new();
private long _bufferBytes = 0;
private readonly MemoryStream _memory = new();
protected override async ValueTask WriteAsync(T log, CancellationToken token) {
try {
_serialize(_memory, log);
}
catch (Exception ex) {
_logger?.LogError(ex, "Failed to serialize log.\n{ErrorMessage}", ex.Message);
return;
}
byte[] blob = _memory.ToArray();
_memory.SetLength(0);
if (!TryAdd(blob)) {
await FlushAsync(token).ConfigureAwait(false);
TryAdd(blob);
}
}
private bool TryAdd(byte[] blob) {
if (_buffer.Count >= BufferLogs) { return false; }
long recordBytes = blob.Length;
if (_bufferBytes + recordBytes > BufferBytes) { return false; }
_buffer.Add(blob);
_bufferBytes += recordBytes;
return true;
}
protected override async Task FlushAsync(CancellationToken token) {
if (_buffer.Count is 0) { return; }
PutRecordBatchRequest req = new() {
DeliveryStreamName = _streamName,
Records = _buffer.Select(t => new Record { Data = new MemoryStream(t) }).ToList(),
};
try {
await _retryPolicy.Value.ExecuteAsync(async () => {
PutRecordBatchResponse res = await _firehose.PutRecordBatchAsync(req).ConfigureAwait(false);
if (res.FailedPutCount is 0) { return; }
List<Record> retryRecords = new(capacity: req.Records.Count);
foreach ((Record record, PutRecordBatchResponseEntry entry) in req.Records.Zip(res.RequestResponses)) {
switch (entry.ErrorCode) {
case "ServiceUnavailableException":
retryRecords.Add(record);
break;
case { Length: > 0 }:
_logger?.LogError("Part of PutRecordBatch failed.\n{ErrorCode}: {ErrorMessage}", entry.ErrorCode, entry.ErrorMessage);
break;
}
}
if (retryRecords.Count is 0) { return; }
req.Records = retryRecords;
throw new ServiceUnavailableException(message: null);
}).ConfigureAwait(false);
}
catch (ServiceUnavailableException ex) {
_logger?.LogError(ex, "An error occurred during flush.\n{ErrorMessage}", ex.Message);
}
_buffer.Clear();
_bufferBytes = 0;
}
}
using System.Diagnostics;
using System.Text.Json;
using Amazon.KinesisFirehose;
using Commons;
internal sealed class RequestLogger : FirehoseTypedLogger<Request> {
public RequestLogger(IAmazonKinesisFirehose firehose, string streamName, ILogger<RequestLogger>? logger = null)
: base(firehose, streamName, Serialize, logger) { }
private static void Serialize(MemoryStream memory, Request request) {
var log = new {
timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
traceId = Activity.Current?.TraceId.ToString(),
test = request.Test,
ip = request.Device?.IPAddress?.ToString(),
};
// NOTE: blob が 1000 KiB を超えないように。
// ref. https://docs.aws.amazon.com/ja_jp/firehose/latest/APIReference/API_Record.html
JsonSerializer.Serialize(memory, log);
}
}
using FluentAssertions;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;
namespace Commons.Tests;
public sealed class TypedLoggerTests {
private readonly XunitLogger<FakeTypedLogger> _logger;
public TypedLoggerTests(ITestOutputHelper output) => _logger = new(output);
[Fact]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:命名スタイル", Justification = "<保留中>")]
public async Task Dispose() {
// Arrange
FakeTypedLogger typedLogger = new(_logger);
// Act
typedLogger.Log(1);
await Task.Delay(20).ConfigureAwait(false);
typedLogger.Dispose();
typedLogger.Log(2);
await Task.Delay(20).ConfigureAwait(false);
// Assert
typedLogger.ActualWriteLogs.Should().BeEquivalentTo(new[] { 1 });
typedLogger.ActualFlushCount.Should().Be(1);
}
[Fact]
public async Task DisposeAsync() {
// Arrange
FakeTypedLogger typedLogger = new(_logger);
// Act
typedLogger.Log(1);
await Task.Delay(20).ConfigureAwait(false);
await typedLogger.DisposeAsync().ConfigureAwait(false);
typedLogger.Log(2);
await Task.Delay(20).ConfigureAwait(false);
// Assert
typedLogger.ActualWriteLogs.Should().BeEquivalentTo(new[] { 1 });
typedLogger.ActualFlushCount.Should().Be(1);
}
[Fact]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:命名スタイル", Justification = "<保留中>")]
public async Task Log_LongChargeTime() {
// Arrange
FakeTypedLogger typedLogger = new(_logger) {
ChargeTime = Timeout.InfiniteTimeSpan,
};
int[] logs1 = new[] { 1, 2 };
int[] logs2 = new[] { 3 };
// Act
Parallel.ForEach(logs1, typedLogger.Log);
await Task.Delay(20).ConfigureAwait(false);
Parallel.ForEach(logs2, typedLogger.Log);
await Task.Delay(20).ConfigureAwait(false);
await typedLogger.DisposeAsync().ConfigureAwait(false);
// Assert
typedLogger.ActualWriteLogs.Should().BeEquivalentTo(logs1);
typedLogger.ActualFlushCount.Should().Be(1);
}
[Fact]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:命名スタイル", Justification = "<保留中>")]
public async Task Log_ShortChargeTime() {
// Arrange
FakeTypedLogger typedLogger = new(_logger) {
ChargeTime = TimeSpan.Zero,
};
int[] logs1 = new[] { 1, 2 };
int[] logs2 = new[] { 3 };
// Act
Parallel.ForEach(logs1, typedLogger.Log);
await Task.Delay(20).ConfigureAwait(false);
Parallel.ForEach(logs2, typedLogger.Log);
await Task.Delay(20).ConfigureAwait(false);
await typedLogger.DisposeAsync().ConfigureAwait(false);
// Assert
typedLogger.ActualWriteLogs.Should().BeEquivalentTo(logs1.Concat(logs2));
typedLogger.ActualFlushCount.Should().Be(2);
}
private sealed class FakeTypedLogger : TypedLogger<int> {
public FakeTypedLogger(ILogger<FakeTypedLogger>? logger = null) : base(logger) {
ChargeTime = TimeSpan.Zero;
}
public readonly List<int> ActualWriteLogs = new();
public int ActualFlushCount = 0;
protected override ValueTask WriteAsync(int log, CancellationToken token) {
ActualWriteLogs.Add(log);
return default;
}
protected override Task FlushAsync(CancellationToken token) {
ActualFlushCount++;
return Task.CompletedTask;
}
}
}
internal sealed class XunitLogger<T> : ILogger<T> {
private readonly ITestOutputHelper _output;
public XunitLogger(ITestOutputHelper output) => _output = output;
public IDisposable? BeginScope<TState>(TState state) where TState : notnull => null;
public bool IsEnabled(LogLevel logLevel) => true;
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter) {
_output.WriteLine(formatter(state, exception));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment