Created
August 29, 2023 10:28
-
-
Save admir-live/828548f3d1b04badea59fb7c09bbd6ee to your computer and use it in GitHub Desktop.
StreamSyncService.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public sealed class StreamSyncService : BackgroundService, IStreamSyncService | |
{ | |
// Class Definition and Properties | |
private const int DegreeOfParallelism = 100; | |
// We have to see if we have api limits per social media network. In case if we have we have to create channel per network type. | |
private readonly Channel<SocialMediaPost> _channel; | |
private readonly ILogger<StreamSyncService> _logger; | |
private readonly IServiceProvider _serviceProvider; | |
// Constructor | |
public StreamSyncService(IServiceProvider serviceProvider, IRouteFactory routeFactory, ILogger<StreamSyncService> logger) | |
{ | |
_serviceProvider = serviceProvider; | |
_logger = logger; | |
Routes = routeFactory.CreateRoutes(); | |
_channel = Channel.CreateBounded<SocialMediaPost>(new BoundedChannelOptions(DegreeOfParallelism) { FullMode = BoundedChannelFullMode.Wait, SingleReader = true }); | |
} | |
// Routes and Policy | |
private IDictionary<NetworkTypes, Func<(IServiceProvider, SocialMediaPost, CancellationToken), Task>> Routes { get; } | |
private static AsyncRetryPolicy Policy { get; } = Polly.Policy | |
.Handle<Exception>() | |
.WaitAndRetryAsync(6, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))); | |
// Service Lifecycle | |
public Task Initialize(CancellationToken cancellationToken) | |
{ | |
_logger.LogInformation("Initializing StreamSyncService..."); | |
return Task.CompletedTask; | |
} | |
public override Task StartAsync(CancellationToken cancellationToken) | |
{ | |
_logger.LogInformation("Starting StreamSyncService..."); | |
return base.StartAsync(cancellationToken); | |
} | |
public override Task StopAsync(CancellationToken cancellationToken) | |
{ | |
_logger.LogInformation("Stopping StreamSyncService..."); | |
return base.StopAsync(cancellationToken); | |
} | |
// Main Execution | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
await Initialize(stoppingToken); | |
var writerTask = WriteToChannelAsync(_channel.Writer, stoppingToken); | |
var readerTask = ReadAndProcessAsync(_channel.Reader, stoppingToken); | |
await Task.WhenAll(writerTask, readerTask); | |
await StopAsync(stoppingToken); | |
} | |
// Writing to Channel | |
private async Task WriteToChannelAsync(ChannelWriter<SocialMediaPost> writer, CancellationToken stoppingToken) | |
{ | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
try | |
{ | |
await Policy.ExecuteAsync(async () => | |
{ | |
using var scope = _serviceProvider.CreateScope(); | |
var brokerService = scope.ServiceProvider.GetService<IBrokerService>(); | |
var messages = await brokerService.GetPostsAsync(cancellationToken: stoppingToken); | |
foreach (var message in messages) | |
{ | |
await writer.WriteAsync(message, stoppingToken); | |
} | |
}); | |
} | |
catch (Exception ex) | |
{ | |
_logger.LogError(ex, "An error occurred while writing messages to the channel."); | |
} | |
} | |
writer.Complete(); | |
} | |
// Reading and Processing from Channel | |
private async Task ReadAndProcessAsync(ChannelReader<SocialMediaPost> reader, CancellationToken stoppingToken) | |
{ | |
while (await reader.WaitToReadAsync(stoppingToken) && !stoppingToken.IsCancellationRequested) | |
{ | |
var processingTasks = new List<Task>(); | |
while (reader.TryRead(out var message)) | |
{ | |
processingTasks.Add(ProcessMessageAsync(_serviceProvider, message, stoppingToken)); | |
} | |
await Task.WhenAll(processingTasks); | |
} | |
} | |
private async Task ProcessMessageAsync(IServiceProvider serviceProvider, SocialMediaPost socialMediaPost, CancellationToken cancellationToken) | |
{ | |
var tasks = Routes.Select(route => ExecuteRouteAsync(serviceProvider, socialMediaPost, route, cancellationToken)); | |
await Task.WhenAll(tasks); | |
} | |
private async Task ExecuteRouteAsync(IServiceProvider serviceProvider, SocialMediaPost socialMediaPost, | |
KeyValuePair<NetworkTypes, Func<(IServiceProvider, SocialMediaPost, CancellationToken), Task>> route, CancellationToken cancellationToken) | |
{ | |
try | |
{ | |
await route.Value((serviceProvider, socialMediaPost, cancellationToken)); | |
} | |
catch (Exception ex) | |
{ | |
_logger.LogError(ex, $"An error occurred while processing message for network: {route.Key}."); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment