Last active
August 2, 2023 13:11
-
-
Save in-async/e5a20741ec66fc81836aa88965432165 to your computer and use it in GitHub Desktop.
ログを外部リポジトリに書き込む為のクラス。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Channels; | |
namespace Commons; | |
public interface ITypedLogger<T> { | |
void Log(T log); | |
} | |
public abstract class TypedLogger<T> : ITypedLogger<T>, IDisposable, IAsyncDisposable { | |
private readonly ChannelWriter<T> _writer; | |
private readonly Task _consumeTask; | |
private readonly CancellationTokenSource _sts = new(); | |
private readonly ILogger<TypedLogger<T>>? _logger; | |
public TypedLogger(ILogger<TypedLogger<T>>? logger = null) { | |
_logger = logger; | |
var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions { | |
SingleWriter = false, | |
SingleReader = true, | |
}); | |
_writer = channel.Writer; | |
_consumeTask = Task.Run(async () => { | |
try { | |
await ConsumeAsync(channel.Reader, _sts.Token).ConfigureAwait(false); | |
} | |
catch (OperationCanceledException ex) when (ex.CancellationToken == _sts.Token) { } | |
catch (Exception ex) { | |
logger?.LogError(ex, null); | |
} | |
}); | |
} | |
public TimeSpan ChargeTime { get; init; } = TimeSpan.FromSeconds(1); | |
~TypedLogger() { | |
Dispose(disposing: false); | |
} | |
public void Dispose() { | |
Dispose(disposing: true); | |
GC.SuppressFinalize(this); | |
} | |
public async ValueTask DisposeAsync() { | |
await DisposeAsyncCore().ConfigureAwait(false); | |
Dispose(disposing: false); | |
GC.SuppressFinalize(this); | |
} | |
private int _disposed; | |
protected virtual void Dispose(bool disposing) { | |
DisposeAsyncCore().AsTask().GetAwaiter().GetResult(); | |
} | |
protected virtual ValueTask DisposeAsyncCore() { | |
if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 0) { | |
_writer.TryComplete(); | |
try { | |
_sts.Cancel(); | |
} | |
catch (AggregateException ex) { | |
_logger?.LogError(ex, "An error occurred when stopping.\n{ErrorMessage}", ex.Message); | |
} | |
finally { | |
_sts.Dispose(); | |
} | |
} | |
return new ValueTask(_consumeTask); | |
} | |
public void Log(T log) { | |
if (!_writer.TryWrite(log)) { | |
_logger?.LogError("Failed to write log to channel."); | |
} | |
} | |
private async Task ConsumeAsync(ChannelReader<T> reader, CancellationToken stoppingToken) { | |
while (await reader.WaitToReadAsync(stoppingToken).ConfigureAwait(false)) { | |
while (reader.TryRead(out T? log)) { | |
await WriteAsync(log, stoppingToken).ConfigureAwait(false); | |
} | |
await FlushAsync(stoppingToken).ConfigureAwait(false); | |
await Task.Delay(ChargeTime, stoppingToken).ConfigureAwait(false); | |
} | |
} | |
protected abstract ValueTask WriteAsync(T log, CancellationToken token); | |
protected abstract Task FlushAsync(CancellationToken token); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Amazon.KinesisFirehose; | |
using Amazon.KinesisFirehose.Model; | |
using Polly; | |
using Polly.Contrib.WaitAndRetry; | |
using Polly.Retry; | |
namespace Commons; | |
public class FirehoseTypedLogger<T> : TypedLogger<T> { | |
private readonly IAmazonKinesisFirehose _firehose; | |
private readonly string _streamName; | |
private readonly Action<MemoryStream, T> _serialize; | |
private readonly ILogger<FirehoseTypedLogger<T>>? _logger; | |
private readonly Lazy<AsyncRetryPolicy> _retryPolicy; | |
public FirehoseTypedLogger(IAmazonKinesisFirehose firehose, string streamName, Action<MemoryStream, T> serialize, ILogger<FirehoseTypedLogger<T>>? logger = null) | |
: base(logger) { | |
_firehose = firehose ?? throw new ArgumentNullException(nameof(firehose)); | |
_streamName = streamName ?? throw new ArgumentNullException(nameof(streamName)); | |
_serialize = serialize ?? throw new ArgumentNullException(nameof(serialize)); | |
_logger = logger; | |
_retryPolicy = new(() => Policy | |
.Handle<ServiceUnavailableException>() | |
.WaitAndRetryAsync(Backoff.DecorrelatedJitterBackoffV2(medianFirstRetryDelay: TimeSpan.FromSeconds(1), RetryCount)) | |
); | |
ChargeTime = TimeSpan.FromSeconds(5); | |
} | |
// NOTE: https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html | |
public int BufferLogs { get; init; } = 500; | |
public long BufferBytes { get; init; } = 4194304; // 4 MiB | |
public int RetryCount { get; init; } = 5; | |
private readonly List<byte[]> _buffer = new(); | |
private long _bufferBytes = 0; | |
private readonly MemoryStream _memory = new(); | |
protected override async ValueTask WriteAsync(T log, CancellationToken token) { | |
try { | |
_serialize(_memory, log); | |
} | |
catch (Exception ex) { | |
_logger?.LogError(ex, "Failed to serialize log.\n{ErrorMessage}", ex.Message); | |
return; | |
} | |
byte[] blob = _memory.ToArray(); | |
_memory.SetLength(0); | |
if (!TryAdd(blob)) { | |
await FlushAsync(token).ConfigureAwait(false); | |
TryAdd(blob); | |
} | |
} | |
private bool TryAdd(byte[] blob) { | |
if (_buffer.Count >= BufferLogs) { return false; } | |
long recordBytes = blob.Length; | |
if (_bufferBytes + recordBytes > BufferBytes) { return false; } | |
_buffer.Add(blob); | |
_bufferBytes += recordBytes; | |
return true; | |
} | |
protected override async Task FlushAsync(CancellationToken token) { | |
if (_buffer.Count is 0) { return; } | |
PutRecordBatchRequest req = new() { | |
DeliveryStreamName = _streamName, | |
Records = _buffer.Select(t => new Record { Data = new MemoryStream(t) }).ToList(), | |
}; | |
try { | |
await _retryPolicy.Value.ExecuteAsync(async () => { | |
PutRecordBatchResponse res = await _firehose.PutRecordBatchAsync(req).ConfigureAwait(false); | |
if (res.FailedPutCount is 0) { return; } | |
List<Record> retryRecords = new(capacity: req.Records.Count); | |
foreach ((Record record, PutRecordBatchResponseEntry entry) in req.Records.Zip(res.RequestResponses)) { | |
switch (entry.ErrorCode) { | |
case "ServiceUnavailableException": | |
retryRecords.Add(record); | |
break; | |
case { Length: > 0 }: | |
_logger?.LogError("Part of PutRecordBatch failed.\n{ErrorCode}: {ErrorMessage}", entry.ErrorCode, entry.ErrorMessage); | |
break; | |
} | |
} | |
if (retryRecords.Count is 0) { return; } | |
req.Records = retryRecords; | |
throw new ServiceUnavailableException(message: null); | |
}).ConfigureAwait(false); | |
} | |
catch (ServiceUnavailableException ex) { | |
_logger?.LogError(ex, "An error occurred during flush.\n{ErrorMessage}", ex.Message); | |
} | |
_buffer.Clear(); | |
_bufferBytes = 0; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Diagnostics; | |
using System.Text.Json; | |
using Amazon.KinesisFirehose; | |
using Commons; | |
internal sealed class RequestLogger : FirehoseTypedLogger<Request> { | |
public RequestLogger(IAmazonKinesisFirehose firehose, string streamName, ILogger<RequestLogger>? logger = null) | |
: base(firehose, streamName, Serialize, logger) { } | |
private static void Serialize(MemoryStream memory, Request request) { | |
var log = new { | |
timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(), | |
traceId = Activity.Current?.TraceId.ToString(), | |
test = request.Test, | |
ip = request.Device?.IPAddress?.ToString(), | |
}; | |
// NOTE: blob が 1000 KiB を超えないように。 | |
// ref. https://docs.aws.amazon.com/ja_jp/firehose/latest/APIReference/API_Record.html | |
JsonSerializer.Serialize(memory, log); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using FluentAssertions; | |
using Microsoft.Extensions.Logging; | |
using Xunit; | |
using Xunit.Abstractions; | |
namespace Commons.Tests; | |
public sealed class TypedLoggerTests { | |
private readonly XunitLogger<FakeTypedLogger> _logger; | |
public TypedLoggerTests(ITestOutputHelper output) => _logger = new(output); | |
[Fact] | |
[System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:命名スタイル", Justification = "<保留中>")] | |
public async Task Dispose() { | |
// Arrange | |
FakeTypedLogger typedLogger = new(_logger); | |
// Act | |
typedLogger.Log(1); | |
await Task.Delay(20).ConfigureAwait(false); | |
typedLogger.Dispose(); | |
typedLogger.Log(2); | |
await Task.Delay(20).ConfigureAwait(false); | |
// Assert | |
typedLogger.ActualWriteLogs.Should().BeEquivalentTo(new[] { 1 }); | |
typedLogger.ActualFlushCount.Should().Be(1); | |
} | |
[Fact] | |
public async Task DisposeAsync() { | |
// Arrange | |
FakeTypedLogger typedLogger = new(_logger); | |
// Act | |
typedLogger.Log(1); | |
await Task.Delay(20).ConfigureAwait(false); | |
await typedLogger.DisposeAsync().ConfigureAwait(false); | |
typedLogger.Log(2); | |
await Task.Delay(20).ConfigureAwait(false); | |
// Assert | |
typedLogger.ActualWriteLogs.Should().BeEquivalentTo(new[] { 1 }); | |
typedLogger.ActualFlushCount.Should().Be(1); | |
} | |
[Fact] | |
[System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:命名スタイル", Justification = "<保留中>")] | |
public async Task Log_LongChargeTime() { | |
// Arrange | |
FakeTypedLogger typedLogger = new(_logger) { | |
ChargeTime = Timeout.InfiniteTimeSpan, | |
}; | |
int[] logs1 = new[] { 1, 2 }; | |
int[] logs2 = new[] { 3 }; | |
// Act | |
Parallel.ForEach(logs1, typedLogger.Log); | |
await Task.Delay(20).ConfigureAwait(false); | |
Parallel.ForEach(logs2, typedLogger.Log); | |
await Task.Delay(20).ConfigureAwait(false); | |
await typedLogger.DisposeAsync().ConfigureAwait(false); | |
// Assert | |
typedLogger.ActualWriteLogs.Should().BeEquivalentTo(logs1); | |
typedLogger.ActualFlushCount.Should().Be(1); | |
} | |
[Fact] | |
[System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:命名スタイル", Justification = "<保留中>")] | |
public async Task Log_ShortChargeTime() { | |
// Arrange | |
FakeTypedLogger typedLogger = new(_logger) { | |
ChargeTime = TimeSpan.Zero, | |
}; | |
int[] logs1 = new[] { 1, 2 }; | |
int[] logs2 = new[] { 3 }; | |
// Act | |
Parallel.ForEach(logs1, typedLogger.Log); | |
await Task.Delay(20).ConfigureAwait(false); | |
Parallel.ForEach(logs2, typedLogger.Log); | |
await Task.Delay(20).ConfigureAwait(false); | |
await typedLogger.DisposeAsync().ConfigureAwait(false); | |
// Assert | |
typedLogger.ActualWriteLogs.Should().BeEquivalentTo(logs1.Concat(logs2)); | |
typedLogger.ActualFlushCount.Should().Be(2); | |
} | |
private sealed class FakeTypedLogger : TypedLogger<int> { | |
public FakeTypedLogger(ILogger<FakeTypedLogger>? logger = null) : base(logger) { | |
ChargeTime = TimeSpan.Zero; | |
} | |
public readonly List<int> ActualWriteLogs = new(); | |
public int ActualFlushCount = 0; | |
protected override ValueTask WriteAsync(int log, CancellationToken token) { | |
ActualWriteLogs.Add(log); | |
return default; | |
} | |
protected override Task FlushAsync(CancellationToken token) { | |
ActualFlushCount++; | |
return Task.CompletedTask; | |
} | |
} | |
} | |
internal sealed class XunitLogger<T> : ILogger<T> { | |
private readonly ITestOutputHelper _output; | |
public XunitLogger(ITestOutputHelper output) => _output = output; | |
public IDisposable? BeginScope<TState>(TState state) where TState : notnull => null; | |
public bool IsEnabled(LogLevel logLevel) => true; | |
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter) { | |
_output.WriteLine(formatter(state, exception)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment