Skip to content

Instantly share code, notes, and snippets.

@admir-live
Created August 29, 2023 10:28
Show Gist options
  • Save admir-live/828548f3d1b04badea59fb7c09bbd6ee to your computer and use it in GitHub Desktop.
Save admir-live/828548f3d1b04badea59fb7c09bbd6ee to your computer and use it in GitHub Desktop.
StreamSyncService.cs
public sealed class StreamSyncService : BackgroundService, IStreamSyncService
{
// Class Definition and Properties
private const int DegreeOfParallelism = 100;
// We have to see if we have api limits per social media network. In case if we have we have to create channel per network type.
private readonly Channel<SocialMediaPost> _channel;
private readonly ILogger<StreamSyncService> _logger;
private readonly IServiceProvider _serviceProvider;
// Constructor
public StreamSyncService(IServiceProvider serviceProvider, IRouteFactory routeFactory, ILogger<StreamSyncService> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
Routes = routeFactory.CreateRoutes();
_channel = Channel.CreateBounded<SocialMediaPost>(new BoundedChannelOptions(DegreeOfParallelism) { FullMode = BoundedChannelFullMode.Wait, SingleReader = true });
}
// Routes and Policy
private IDictionary<NetworkTypes, Func<(IServiceProvider, SocialMediaPost, CancellationToken), Task>> Routes { get; }
private static AsyncRetryPolicy Policy { get; } = Polly.Policy
.Handle<Exception>()
.WaitAndRetryAsync(6, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
// Service Lifecycle
public Task Initialize(CancellationToken cancellationToken)
{
_logger.LogInformation("Initializing StreamSyncService...");
return Task.CompletedTask;
}
public override Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Starting StreamSyncService...");
return base.StartAsync(cancellationToken);
}
public override Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping StreamSyncService...");
return base.StopAsync(cancellationToken);
}
// Main Execution
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Initialize(stoppingToken);
var writerTask = WriteToChannelAsync(_channel.Writer, stoppingToken);
var readerTask = ReadAndProcessAsync(_channel.Reader, stoppingToken);
await Task.WhenAll(writerTask, readerTask);
await StopAsync(stoppingToken);
}
// Writing to Channel
private async Task WriteToChannelAsync(ChannelWriter<SocialMediaPost> writer, CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Policy.ExecuteAsync(async () =>
{
using var scope = _serviceProvider.CreateScope();
var brokerService = scope.ServiceProvider.GetService<IBrokerService>();
var messages = await brokerService.GetPostsAsync(cancellationToken: stoppingToken);
foreach (var message in messages)
{
await writer.WriteAsync(message, stoppingToken);
}
});
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred while writing messages to the channel.");
}
}
writer.Complete();
}
// Reading and Processing from Channel
private async Task ReadAndProcessAsync(ChannelReader<SocialMediaPost> reader, CancellationToken stoppingToken)
{
while (await reader.WaitToReadAsync(stoppingToken) && !stoppingToken.IsCancellationRequested)
{
var processingTasks = new List<Task>();
while (reader.TryRead(out var message))
{
processingTasks.Add(ProcessMessageAsync(_serviceProvider, message, stoppingToken));
}
await Task.WhenAll(processingTasks);
}
}
private async Task ProcessMessageAsync(IServiceProvider serviceProvider, SocialMediaPost socialMediaPost, CancellationToken cancellationToken)
{
var tasks = Routes.Select(route => ExecuteRouteAsync(serviceProvider, socialMediaPost, route, cancellationToken));
await Task.WhenAll(tasks);
}
private async Task ExecuteRouteAsync(IServiceProvider serviceProvider, SocialMediaPost socialMediaPost,
KeyValuePair<NetworkTypes, Func<(IServiceProvider, SocialMediaPost, CancellationToken), Task>> route, CancellationToken cancellationToken)
{
try
{
await route.Value((serviceProvider, socialMediaPost, cancellationToken));
}
catch (Exception ex)
{
_logger.LogError(ex, $"An error occurred while processing message for network: {route.Key}.");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment