Last active
March 12, 2023 17:49
-
-
Save aemloviji/f0389388a0e5ce71f948bf7494b9b6e7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Наше приложение общается с удаленным сервисом: шлет запросы и получает ответы. С удаленным сервером | |
* установлено единственное соединение, по которому мы шлем запросы и получаем ответы. Каждый запрос содержит Id (GUID), | |
* ответ на запрос содержит его же. Ответы на запросы могут приходить в произвольном порядке и с произвольными задержками. | |
* Нам необходимо реализовать интерфейс, который абстрагирует факт такого мультиплексирования. | |
* Реализация `IRequestProcessor.SendAsync` обязана быть потокобезопасной. | |
* | |
* У нас есть готовая реализация интерфейсов `ILowLevelNetworkAdapter` и `IHighLevelNetworkAdapter` | |
*/ | |
// запрос, остальные поля не интересны | |
using System.Collections.Concurrent; | |
public sealed record Request(Guid Id); | |
// ответ, остальные поля не интересны | |
public sealed record Response(Guid Id); | |
// высокоуровневый адаптер с внутренней очередью отправки сообщений, наподобие клиента RabbitMQ или Kafka | |
public interface IHighLevelNetworkAdapter | |
{ | |
// ставит очередной запрос в очередь на отправку. false - очередь переполнена, запрос не будет отправлен | |
bool TryEnqueueWrite(Request request, CancellationToken cancellationToken); | |
// вычитывает очередной ответ, нельзя делать несколько одновременных вызовов ReadAsync | |
Task<Response> ReadAsync(CancellationToken cancellationToken); | |
} | |
// интерфейс, который надо реализовать | |
public interface IRequestProcessor | |
{ | |
// Запускает обработчик. | |
// гарантированно вызывается 1 раз при инициализации приложения | |
Task RunAsync(CancellationToken cancellationToken); | |
// выполняет мягкую остановку, т.е. завершается после завершения обработки всех запросов | |
// гарантированно вызывается 1 раз при остановке приложения | |
Task StopAsync(CancellationToken cancellationToken); | |
// выполняет запрос, этот метод будет вызываться в приложении множеством потоков одновременно | |
// При отмене CancellationToken не обязательно гарантировать то, что мы не отправим запрос на сервер, но клиент должен получить отмену задачи | |
Task<Response> SendAsync(Request request, CancellationToken cancellationToken); | |
} | |
// простой вариант задачи: | |
// 1. Можно пользоваться IHighLevelNetworkAdapter, т.е. переложить очередь отправки на NetworkAdapter | |
// 2. Можно не реализовывать обработку `CancellationToken` в методах `IRequestProcessor` | |
// 3. Можно не реализовывать `IRequestProcessor.StopAsync` | |
public sealed class SimpleRequestProcessor : IRequestProcessor | |
{ | |
private static readonly object _readLock = new(); | |
private readonly IHighLevelNetworkAdapter _networkAdapter; | |
private readonly ConcurrentQueue<Request> _pendingRequestsQueue = new(); | |
public SimpleRequestProcessor(IHighLevelNetworkAdapter networkAdapter) | |
{ | |
_networkAdapter = networkAdapter; | |
} | |
public Task RunAsync(CancellationToken cancellationToken) | |
{ | |
while (!cancellationToken.IsCancellationRequested) | |
{ | |
var hasPendingRequest = TryPeekPendingRequests(out Request? request); | |
if (!hasPendingRequest) | |
{ | |
continue; | |
} | |
var failedToSend = TrySendRequest(request!, cancellationToken); | |
if (!failedToSend) | |
{ | |
TryDequeuePendingRequest(); | |
} | |
} | |
return Task.CompletedTask; | |
} | |
public Task StopAsync(CancellationToken cancellationToken) | |
{ | |
// этот метод можно не реализовывать | |
return Task.CompletedTask; | |
} | |
public Task<Response> SendAsync(Request request, CancellationToken cancellationToken) | |
{ | |
var failedToSend = TrySendRequest(request, cancellationToken); | |
if (failedToSend) | |
{ | |
EnqueuePendingRequests(request); | |
} | |
return ReadResponseAsync(cancellationToken); | |
} | |
private void EnqueuePendingRequests(Request request) | |
=> _pendingRequestsQueue.Enqueue(request); | |
private bool TryPeekPendingRequests(out Request? request) | |
=> _pendingRequestsQueue.TryPeek(out request); | |
private void TryDequeuePendingRequest() | |
=> _pendingRequestsQueue.TryDequeue(out _); | |
private bool TrySendRequest(Request request, CancellationToken cancellationToken) | |
=> _networkAdapter.TryEnqueueWrite(request, cancellationToken); | |
private Task<Response> ReadResponseAsync(CancellationToken cancellationToken) | |
{ | |
lock (_readLock) | |
{ | |
return _networkAdapter.ReadAsync(cancellationToken); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment