Skip to content

Instantly share code, notes, and snippets.

@aemloviji
Last active March 12, 2023 17:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aemloviji/f0389388a0e5ce71f948bf7494b9b6e7 to your computer and use it in GitHub Desktop.
Save aemloviji/f0389388a0e5ce71f948bf7494b9b6e7 to your computer and use it in GitHub Desktop.
/*
* Наше приложение общается с удаленным сервисом: шлет запросы и получает ответы. С удаленным сервером
* установлено единственное соединение, по которому мы шлем запросы и получаем ответы. Каждый запрос содержит Id (GUID),
* ответ на запрос содержит его же. Ответы на запросы могут приходить в произвольном порядке и с произвольными задержками.
* Нам необходимо реализовать интерфейс, который абстрагирует факт такого мультиплексирования.
* Реализация `IRequestProcessor.SendAsync` обязана быть потокобезопасной.
*
* У нас есть готовая реализация интерфейсов `ILowLevelNetworkAdapter` и `IHighLevelNetworkAdapter`
*/
// запрос, остальные поля не интересны
using System.Collections.Concurrent;
public sealed record Request(Guid Id);
// ответ, остальные поля не интересны
public sealed record Response(Guid Id);
// высокоуровневый адаптер с внутренней очередью отправки сообщений, наподобие клиента RabbitMQ или Kafka
public interface IHighLevelNetworkAdapter
{
// ставит очередной запрос в очередь на отправку. false - очередь переполнена, запрос не будет отправлен
bool TryEnqueueWrite(Request request, CancellationToken cancellationToken);
// вычитывает очередной ответ, нельзя делать несколько одновременных вызовов ReadAsync
Task<Response> ReadAsync(CancellationToken cancellationToken);
}
// интерфейс, который надо реализовать
public interface IRequestProcessor
{
// Запускает обработчик.
// гарантированно вызывается 1 раз при инициализации приложения
Task RunAsync(CancellationToken cancellationToken);
// выполняет мягкую остановку, т.е. завершается после завершения обработки всех запросов
// гарантированно вызывается 1 раз при остановке приложения
Task StopAsync(CancellationToken cancellationToken);
// выполняет запрос, этот метод будет вызываться в приложении множеством потоков одновременно
// При отмене CancellationToken не обязательно гарантировать то, что мы не отправим запрос на сервер, но клиент должен получить отмену задачи
Task<Response> SendAsync(Request request, CancellationToken cancellationToken);
}
// простой вариант задачи:
// 1. Можно пользоваться IHighLevelNetworkAdapter, т.е. переложить очередь отправки на NetworkAdapter
// 2. Можно не реализовывать обработку `CancellationToken` в методах `IRequestProcessor`
// 3. Можно не реализовывать `IRequestProcessor.StopAsync`
public sealed class SimpleRequestProcessor : IRequestProcessor
{
private static readonly object _readLock = new();
private readonly IHighLevelNetworkAdapter _networkAdapter;
private readonly ConcurrentQueue<Request> _pendingRequestsQueue = new();
public SimpleRequestProcessor(IHighLevelNetworkAdapter networkAdapter)
{
_networkAdapter = networkAdapter;
}
public Task RunAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var hasPendingRequest = TryPeekPendingRequests(out Request? request);
if (!hasPendingRequest)
{
continue;
}
var failedToSend = TrySendRequest(request!, cancellationToken);
if (!failedToSend)
{
TryDequeuePendingRequest();
}
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
// этот метод можно не реализовывать
return Task.CompletedTask;
}
public Task<Response> SendAsync(Request request, CancellationToken cancellationToken)
{
var failedToSend = TrySendRequest(request, cancellationToken);
if (failedToSend)
{
EnqueuePendingRequests(request);
}
return ReadResponseAsync(cancellationToken);
}
private void EnqueuePendingRequests(Request request)
=> _pendingRequestsQueue.Enqueue(request);
private bool TryPeekPendingRequests(out Request? request)
=> _pendingRequestsQueue.TryPeek(out request);
private void TryDequeuePendingRequest()
=> _pendingRequestsQueue.TryDequeue(out _);
private bool TrySendRequest(Request request, CancellationToken cancellationToken)
=> _networkAdapter.TryEnqueueWrite(request, cancellationToken);
private Task<Response> ReadResponseAsync(CancellationToken cancellationToken)
{
lock (_readLock)
{
return _networkAdapter.ReadAsync(cancellationToken);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment