Created
March 12, 2023 18:50
-
-
Save aemloviji/0cb1fe36a53fdadf4e6dcd42af250ea1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Наше приложение общается с удаленным сервисом: шлет запросы и получает ответы. С удаленным сервером | |
* установлено единственное соединение, по которому мы шлем запросы и получаем ответы. Каждый запрос содержит Id (GUID), | |
* ответ на запрос содержит его же. Ответы на запросы могут приходить в произвольном порядке и с произвольными задержками. | |
* Нам необходимо реализовать интерфейс, который абстрагирует факт такого мультиплексирования. | |
* Реализация `IRequestProcessor.SendAsync` обязана быть потокобезопасной. | |
* | |
* У нас есть готовая реализация интерфейсов `ILowLevelNetworkAdapter` и `IHighLevelNetworkAdapter` | |
*/ | |
// запрос, остальные поля не интересны | |
using System.Collections.Concurrent; | |
public sealed record Request(Guid Id); | |
// ответ, остальные поля не интересны | |
public sealed record Response(Guid Id); | |
// низкоуровневый адаптер, можно делать одновременный вызов ReadAsync и WriteAsync | |
// можно считать это абстракцией над полнодуплексным сокетом | |
public interface ILowLevelNetworkAdapter | |
{ | |
// вычитывает очередной ответ, нельзя делать несколько одновременных вызовов ReadAsync | |
Task<Response> ReadAsync(CancellationToken cancellationToken); | |
// отправляет запрос, нельзя делать несколько одновременных вызовов WriteAsync | |
Task WriteAsync(Request request, CancellationToken cancellationToken); | |
} | |
// интерфейс, который надо реализовать | |
public interface IRequestProcessor | |
{ | |
// Запускает обработчик. | |
// гарантированно вызывается 1 раз при инициализации приложения | |
Task RunAsync(CancellationToken cancellationToken); | |
// выполняет мягкую остановку, т.е. завершается после завершения обработки всех запросов | |
// гарантированно вызывается 1 раз при остановке приложения | |
Task StopAsync(CancellationToken cancellationToken); | |
// выполняет запрос, этот метод будет вызываться в приложении множеством потоков одновременно | |
// При отмене CancellationToken не обязательно гарантировать то, что мы не отправим запрос на сервер, но клиент должен получить отмену задачи | |
Task<Response> SendAsync(Request request, CancellationToken cancellationToken); | |
} | |
// сложный вариант задачи: | |
// 1. можно пользоваться только ILowLevelNetworkAdapter | |
// 2. нужно реализовать обработку cancellationToken | |
// 3. нужно реализовать StopAsync, который дожидается получения ответов на уже переданные | |
// запросы (пока не отменен переданный в `StopAsync` `CancellationToken`) | |
// 4. нужно реализовать настраиваемый таймаут: если ответ на конкретный запрос не получен за заданный промежуток | |
// времени - отменяем задачу, которая вернулась из `SendAsync`. В том числе надо рассмотреть ситуацию, | |
// что ответ на запрос не придет никогда, глобальный таймаут при этом должен отработать и не допустить утечки памяти | |
public sealed class ComplexRequestProcessor : IRequestProcessor | |
{ | |
private static readonly object _readLock = new(); | |
private static readonly object _writeLock = new(); | |
private readonly ILowLevelNetworkAdapter _networkAdapter; | |
private readonly TimeSpan _requestTimeout; | |
private readonly ConcurrentQueue<Request> _pendingRequestsQueue = new(); | |
private bool _enableThrottle = false; | |
public ComplexRequestProcessor(ILowLevelNetworkAdapter networkAdapter, TimeSpan requestTimeout) | |
{ | |
if (requestTimeout <= TimeSpan.Zero) | |
throw new ArgumentOutOfRangeException(nameof(requestTimeout)); | |
_networkAdapter = networkAdapter; | |
_requestTimeout = requestTimeout; | |
} | |
public Task RunAsync(CancellationToken cancellationToken) | |
{ | |
while (!cancellationToken.IsCancellationRequested) | |
{ | |
var hasPendingRequest = TryPeekPendingRequests(out Request? request); | |
if (!hasPendingRequest) | |
{ | |
continue; | |
} | |
SendRequest(request!, cancellationToken); | |
TryDequeuePendingRequest(); | |
} | |
return Task.CompletedTask; | |
} | |
public Task StopAsync(CancellationToken cancellationToken) | |
{ | |
if (cancellationToken.IsCancellationRequested) | |
{ | |
_enableThrottle = true; | |
} | |
return Task.CompletedTask; | |
} | |
public Task<Response> SendAsync(Request request, CancellationToken cancellationToken) | |
{ | |
void DoIfThrottleNotEnabled(Action action) | |
{ | |
if (!_enableThrottle) | |
{ | |
action(); | |
} | |
} | |
try | |
{ | |
DoIfThrottleNotEnabled( | |
() => SendRequest(request, cancellationToken)); | |
} | |
catch | |
{ | |
DoIfThrottleNotEnabled( | |
() => EnqueuePendingRequests(request)); | |
} | |
return ReadResponseAsync(cancellationToken); | |
} | |
private void EnqueuePendingRequests(Request request) | |
=> _pendingRequestsQueue.Enqueue(request); | |
private bool TryPeekPendingRequests(out Request? request) | |
=> _pendingRequestsQueue.TryPeek(out request); | |
private void TryDequeuePendingRequest() | |
=> _pendingRequestsQueue.TryDequeue(out _); | |
private Task SendRequest(Request request, CancellationToken cancellationToken) | |
{ | |
lock (_writeLock) | |
{ | |
return PeriodicTask.WithTimeout( | |
task: () => _networkAdapter.WriteAsync(request, cancellationToken), | |
timeout: _requestTimeout, | |
cancellationToken); | |
} | |
} | |
private Task<Response> ReadResponseAsync(CancellationToken cancellationToken) | |
{ | |
lock (_readLock) | |
{ | |
return _networkAdapter.ReadAsync(cancellationToken); | |
} | |
} | |
} | |
public static class PeriodicTask | |
{ | |
public static async Task WithTimeout( | |
Func<Task> task, | |
TimeSpan timeout, | |
CancellationToken cancellationToken) | |
{ | |
var delayTask = Task.Delay(timeout, cancellationToken); | |
await Task.WhenAny(task(), delayTask); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment