Last active
March 12, 2023 18:12
-
-
Save aemloviji/5cfa84fcfaac5c5ab3089d0584bbe12a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Наше приложение общается с удаленным сервисом: шлет запросы и получает ответы. С удаленным сервером | |
* установлено единственное соединение, по которому мы шлем запросы и получаем ответы. Каждый запрос содержит Id (GUID), | |
* ответ на запрос содержит его же. Ответы на запросы могут приходить в произвольном порядке и с произвольными задержками. | |
* Нам необходимо реализовать интерфейс, который абстрагирует факт такого мультиплексирования. | |
* Реализация `IRequestProcessor.SendAsync` обязана быть потокобезопасной. | |
* | |
* У нас есть готовая реализация интерфейсов `ILowLevelNetworkAdapter` и `IHighLevelNetworkAdapter` | |
*/ | |
// запрос, остальные поля не интересны | |
using System.Collections.Concurrent; | |
public sealed record Request(Guid Id); | |
// ответ, остальные поля не интересны | |
public sealed record Response(Guid Id); | |
// низкоуровневый адаптер, можно делать одновременный вызов ReadAsync и WriteAsync | |
// можно считать это абстракцией над полнодуплексным сокетом | |
public interface ILowLevelNetworkAdapter | |
{ | |
// вычитывает очередной ответ, нельзя делать несколько одновременных вызовов ReadAsync | |
Task<Response> ReadAsync(CancellationToken cancellationToken); | |
// отправляет запрос, нельзя делать несколько одновременных вызовов WriteAsync | |
Task WriteAsync(Request request, CancellationToken cancellationToken); | |
} | |
// интерфейс, который надо реализовать | |
public interface IRequestProcessor | |
{ | |
// Запускает обработчик. | |
// гарантированно вызывается 1 раз при инициализации приложения | |
Task RunAsync(CancellationToken cancellationToken); | |
// выполняет мягкую остановку, т.е. завершается после завершения обработки всех запросов | |
// гарантированно вызывается 1 раз при остановке приложения | |
Task StopAsync(CancellationToken cancellationToken); | |
// выполняет запрос, этот метод будет вызываться в приложении множеством потоков одновременно | |
// При отмене CancellationToken не обязательно гарантировать то, что мы не отправим запрос на сервер, но клиент должен получить отмену задачи | |
Task<Response> SendAsync(Request request, CancellationToken cancellationToken); | |
} | |
// средний вариант задачи: | |
// 1. можно пользоваться только ILowLevelNetworkAdapter | |
// 2. нужно реализовать обработку cancellationToken | |
// 3. можно не реализовывать `IRequestProcessor.StopAsync` | |
public sealed class MediumRequestProcessor : IRequestProcessor | |
{ | |
private static readonly object _readLock = new(); | |
private static readonly object _writeLock = new(); | |
private readonly ILowLevelNetworkAdapter _networkAdapter; | |
private readonly ConcurrentQueue<Request> _pendingRequestsQueue = new(); | |
public MediumRequestProcessor(ILowLevelNetworkAdapter networkAdapter) | |
{ | |
_networkAdapter = networkAdapter; | |
} | |
public Task RunAsync(CancellationToken cancellationToken) | |
{ | |
while (!cancellationToken.IsCancellationRequested) | |
{ | |
var hasPendingRequest = TryPeekPendingRequests(out Request? request); | |
if (!hasPendingRequest) | |
{ | |
continue; | |
} | |
SendRequest(request!, cancellationToken); | |
TryDequeuePendingRequest(); | |
} | |
return Task.CompletedTask; | |
} | |
public Task StopAsync(CancellationToken cancellationToken) | |
{ | |
// этот метод можно не реализовывать | |
return Task.CompletedTask; | |
} | |
public Task<Response> SendAsync(Request request, CancellationToken cancellationToken) | |
{ | |
try | |
{ | |
SendRequest(request, cancellationToken); | |
} | |
catch | |
{ | |
EnqueuePendingRequests(request); | |
} | |
return ReadResponseAsync(cancellationToken); | |
} | |
private void EnqueuePendingRequests(Request request) | |
=> _pendingRequestsQueue.Enqueue(request); | |
private bool TryPeekPendingRequests(out Request? request) | |
=> _pendingRequestsQueue.TryPeek(out request); | |
private void TryDequeuePendingRequest() | |
=> _pendingRequestsQueue.TryDequeue(out _); | |
private Task SendRequest(Request request, CancellationToken cancellationToken) | |
{ | |
lock (_writeLock) | |
{ | |
return _networkAdapter.WriteAsync(request, cancellationToken); | |
} | |
} | |
private Task<Response> ReadResponseAsync(CancellationToken cancellationToken) | |
{ | |
lock (_readLock) | |
{ | |
return _networkAdapter.ReadAsync(cancellationToken); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment