Skip to content

Instantly share code, notes, and snippets.

@aemloviji
Created March 12, 2023 18:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aemloviji/0cb1fe36a53fdadf4e6dcd42af250ea1 to your computer and use it in GitHub Desktop.
Save aemloviji/0cb1fe36a53fdadf4e6dcd42af250ea1 to your computer and use it in GitHub Desktop.
/*
* Наше приложение общается с удаленным сервисом: шлет запросы и получает ответы. С удаленным сервером
* установлено единственное соединение, по которому мы шлем запросы и получаем ответы. Каждый запрос содержит Id (GUID),
* ответ на запрос содержит его же. Ответы на запросы могут приходить в произвольном порядке и с произвольными задержками.
* Нам необходимо реализовать интерфейс, который абстрагирует факт такого мультиплексирования.
* Реализация `IRequestProcessor.SendAsync` обязана быть потокобезопасной.
*
* У нас есть готовая реализация интерфейсов `ILowLevelNetworkAdapter` и `IHighLevelNetworkAdapter`
*/
// запрос, остальные поля не интересны
using System.Collections.Concurrent;
public sealed record Request(Guid Id);
// ответ, остальные поля не интересны
public sealed record Response(Guid Id);
// низкоуровневый адаптер, можно делать одновременный вызов ReadAsync и WriteAsync
// можно считать это абстракцией над полнодуплексным сокетом
public interface ILowLevelNetworkAdapter
{
// вычитывает очередной ответ, нельзя делать несколько одновременных вызовов ReadAsync
Task<Response> ReadAsync(CancellationToken cancellationToken);
// отправляет запрос, нельзя делать несколько одновременных вызовов WriteAsync
Task WriteAsync(Request request, CancellationToken cancellationToken);
}
// интерфейс, который надо реализовать
public interface IRequestProcessor
{
// Запускает обработчик.
// гарантированно вызывается 1 раз при инициализации приложения
Task RunAsync(CancellationToken cancellationToken);
// выполняет мягкую остановку, т.е. завершается после завершения обработки всех запросов
// гарантированно вызывается 1 раз при остановке приложения
Task StopAsync(CancellationToken cancellationToken);
// выполняет запрос, этот метод будет вызываться в приложении множеством потоков одновременно
// При отмене CancellationToken не обязательно гарантировать то, что мы не отправим запрос на сервер, но клиент должен получить отмену задачи
Task<Response> SendAsync(Request request, CancellationToken cancellationToken);
}
// сложный вариант задачи:
// 1. можно пользоваться только ILowLevelNetworkAdapter
// 2. нужно реализовать обработку cancellationToken
// 3. нужно реализовать StopAsync, который дожидается получения ответов на уже переданные
// запросы (пока не отменен переданный в `StopAsync` `CancellationToken`)
// 4. нужно реализовать настраиваемый таймаут: если ответ на конкретный запрос не получен за заданный промежуток
// времени - отменяем задачу, которая вернулась из `SendAsync`. В том числе надо рассмотреть ситуацию,
// что ответ на запрос не придет никогда, глобальный таймаут при этом должен отработать и не допустить утечки памяти
public sealed class ComplexRequestProcessor : IRequestProcessor
{
private static readonly object _readLock = new();
private static readonly object _writeLock = new();
private readonly ILowLevelNetworkAdapter _networkAdapter;
private readonly TimeSpan _requestTimeout;
private readonly ConcurrentQueue<Request> _pendingRequestsQueue = new();
private bool _enableThrottle = false;
public ComplexRequestProcessor(ILowLevelNetworkAdapter networkAdapter, TimeSpan requestTimeout)
{
if (requestTimeout <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(requestTimeout));
_networkAdapter = networkAdapter;
_requestTimeout = requestTimeout;
}
public Task RunAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var hasPendingRequest = TryPeekPendingRequests(out Request? request);
if (!hasPendingRequest)
{
continue;
}
SendRequest(request!, cancellationToken);
TryDequeuePendingRequest();
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
_enableThrottle = true;
}
return Task.CompletedTask;
}
public Task<Response> SendAsync(Request request, CancellationToken cancellationToken)
{
void DoIfThrottleNotEnabled(Action action)
{
if (!_enableThrottle)
{
action();
}
}
try
{
DoIfThrottleNotEnabled(
() => SendRequest(request, cancellationToken));
}
catch
{
DoIfThrottleNotEnabled(
() => EnqueuePendingRequests(request));
}
return ReadResponseAsync(cancellationToken);
}
private void EnqueuePendingRequests(Request request)
=> _pendingRequestsQueue.Enqueue(request);
private bool TryPeekPendingRequests(out Request? request)
=> _pendingRequestsQueue.TryPeek(out request);
private void TryDequeuePendingRequest()
=> _pendingRequestsQueue.TryDequeue(out _);
private Task SendRequest(Request request, CancellationToken cancellationToken)
{
lock (_writeLock)
{
return PeriodicTask.WithTimeout(
task: () => _networkAdapter.WriteAsync(request, cancellationToken),
timeout: _requestTimeout,
cancellationToken);
}
}
private Task<Response> ReadResponseAsync(CancellationToken cancellationToken)
{
lock (_readLock)
{
return _networkAdapter.ReadAsync(cancellationToken);
}
}
}
public static class PeriodicTask
{
public static async Task WithTimeout(
Func<Task> task,
TimeSpan timeout,
CancellationToken cancellationToken)
{
var delayTask = Task.Delay(timeout, cancellationToken);
await Task.WhenAny(task(), delayTask);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment