Skip to content

Instantly share code, notes, and snippets.

@aemloviji
Last active March 12, 2023 18:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aemloviji/5cfa84fcfaac5c5ab3089d0584bbe12a to your computer and use it in GitHub Desktop.
Save aemloviji/5cfa84fcfaac5c5ab3089d0584bbe12a to your computer and use it in GitHub Desktop.
/*
* Наше приложение общается с удаленным сервисом: шлет запросы и получает ответы. С удаленным сервером
* установлено единственное соединение, по которому мы шлем запросы и получаем ответы. Каждый запрос содержит Id (GUID),
* ответ на запрос содержит его же. Ответы на запросы могут приходить в произвольном порядке и с произвольными задержками.
* Нам необходимо реализовать интерфейс, который абстрагирует факт такого мультиплексирования.
* Реализация `IRequestProcessor.SendAsync` обязана быть потокобезопасной.
*
* У нас есть готовая реализация интерфейсов `ILowLevelNetworkAdapter` и `IHighLevelNetworkAdapter`
*/
// запрос, остальные поля не интересны
using System.Collections.Concurrent;
public sealed record Request(Guid Id);
// ответ, остальные поля не интересны
public sealed record Response(Guid Id);
// низкоуровневый адаптер, можно делать одновременный вызов ReadAsync и WriteAsync
// можно считать это абстракцией над полнодуплексным сокетом
public interface ILowLevelNetworkAdapter
{
// вычитывает очередной ответ, нельзя делать несколько одновременных вызовов ReadAsync
Task<Response> ReadAsync(CancellationToken cancellationToken);
// отправляет запрос, нельзя делать несколько одновременных вызовов WriteAsync
Task WriteAsync(Request request, CancellationToken cancellationToken);
}
// интерфейс, который надо реализовать
public interface IRequestProcessor
{
// Запускает обработчик.
// гарантированно вызывается 1 раз при инициализации приложения
Task RunAsync(CancellationToken cancellationToken);
// выполняет мягкую остановку, т.е. завершается после завершения обработки всех запросов
// гарантированно вызывается 1 раз при остановке приложения
Task StopAsync(CancellationToken cancellationToken);
// выполняет запрос, этот метод будет вызываться в приложении множеством потоков одновременно
// При отмене CancellationToken не обязательно гарантировать то, что мы не отправим запрос на сервер, но клиент должен получить отмену задачи
Task<Response> SendAsync(Request request, CancellationToken cancellationToken);
}
// средний вариант задачи:
// 1. можно пользоваться только ILowLevelNetworkAdapter
// 2. нужно реализовать обработку cancellationToken
// 3. можно не реализовывать `IRequestProcessor.StopAsync`
public sealed class MediumRequestProcessor : IRequestProcessor
{
private static readonly object _readLock = new();
private static readonly object _writeLock = new();
private readonly ILowLevelNetworkAdapter _networkAdapter;
private readonly ConcurrentQueue<Request> _pendingRequestsQueue = new();
public MediumRequestProcessor(ILowLevelNetworkAdapter networkAdapter)
{
_networkAdapter = networkAdapter;
}
public Task RunAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var hasPendingRequest = TryPeekPendingRequests(out Request? request);
if (!hasPendingRequest)
{
continue;
}
SendRequest(request!, cancellationToken);
TryDequeuePendingRequest();
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
// этот метод можно не реализовывать
return Task.CompletedTask;
}
public Task<Response> SendAsync(Request request, CancellationToken cancellationToken)
{
try
{
SendRequest(request, cancellationToken);
}
catch
{
EnqueuePendingRequests(request);
}
return ReadResponseAsync(cancellationToken);
}
private void EnqueuePendingRequests(Request request)
=> _pendingRequestsQueue.Enqueue(request);
private bool TryPeekPendingRequests(out Request? request)
=> _pendingRequestsQueue.TryPeek(out request);
private void TryDequeuePendingRequest()
=> _pendingRequestsQueue.TryDequeue(out _);
private Task SendRequest(Request request, CancellationToken cancellationToken)
{
lock (_writeLock)
{
return _networkAdapter.WriteAsync(request, cancellationToken);
}
}
private Task<Response> ReadResponseAsync(CancellationToken cancellationToken)
{
lock (_readLock)
{
return _networkAdapter.ReadAsync(cancellationToken);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment