Created
March 21, 2023 09:07
-
-
Save sachinsu/d0d2672d002be670366d58394c1d6031 to your computer and use it in GitHub Desktop.
Asynchronous task scheduling (Queuing) in Web API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Threading.Channels; | |
using Microsoft.Extensions.Logging; | |
using Microsoft.Extensions.Hosting; | |
namespace apiwithqueue.hostedservice { | |
public class QueuedHostedService : BackgroundService | |
{ | |
private readonly ILogger<QueuedHostedService> _logger; | |
public QueuedHostedService(IBackgroundTaskQueue taskQueue, | |
ILogger<QueuedHostedService> logger) | |
{ | |
TaskQueue = taskQueue; | |
_logger = logger; | |
} | |
public IBackgroundTaskQueue TaskQueue { get; } | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
_logger.LogInformation( | |
$"Queued Hosted Service is running.{Environment.NewLine}" + | |
$"{Environment.NewLine}Tap W to add a work item to the " + | |
$"background queue.{Environment.NewLine}"); | |
await BackgroundProcessing(stoppingToken); | |
} | |
private async Task BackgroundProcessing(CancellationToken stoppingToken) | |
{ | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
var workItem = | |
await TaskQueue.DequeueAsync(stoppingToken); | |
try | |
{ | |
await workItem(stoppingToken); | |
} | |
catch (Exception ex) | |
{ | |
_logger.LogError(ex, | |
"Error occurred executing {WorkItem}.", nameof(workItem)); | |
} | |
} | |
} | |
public override async Task StopAsync(CancellationToken stoppingToken) | |
{ | |
_logger.LogInformation("Queued Hosted Service is stopping."); | |
await base.StopAsync(stoppingToken); | |
} | |
} | |
public interface IBackgroundTaskQueue | |
{ | |
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem); | |
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync( | |
CancellationToken cancellationToken); | |
} | |
public class BackgroundTaskQueue : IBackgroundTaskQueue | |
{ | |
private readonly Channel<Func<CancellationToken, ValueTask>> _queue; | |
public BackgroundTaskQueue(int capacity) | |
{ | |
// Capacity should be set based on the expected application load and | |
// number of concurrent threads accessing the queue. | |
// BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task, | |
// which completes only when space became available. This leads to backpressure, | |
// in case too many publishers/calls start accumulating. | |
var options = new BoundedChannelOptions(capacity) | |
{ | |
FullMode = BoundedChannelFullMode.Wait | |
}; | |
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options); | |
} | |
public async ValueTask QueueBackgroundWorkItemAsync( | |
Func<CancellationToken, ValueTask> workItem) | |
{ | |
if (workItem == null) | |
{ | |
throw new ArgumentNullException(nameof(workItem)); | |
} | |
await _queue.Writer.WriteAsync(workItem); | |
} | |
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync( | |
CancellationToken cancellationToken) | |
{ | |
var workItem = await _queue.Reader.ReadAsync(cancellationToken); | |
return workItem; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.AspNetCore.Mvc; | |
using Microsoft.Extensions.Logging; | |
using apiwithqueue.hostedservice; | |
namespace apiwithqueue.Controllers | |
{ | |
[ApiController] | |
[Route("[controller]")] | |
public class SMSController : ControllerBase | |
{ | |
private readonly IBackgroundTaskQueue _taskQueue; | |
private readonly ILogger<SMSController> _logger; | |
public SMSController(IBackgroundTaskQueue taskQueue ,ILogger<SMSController> logger) | |
{ | |
_logger = logger; | |
_taskQueue = taskQueue; | |
} | |
[HttpGet("Hello")] | |
public async Task<ActionResult<String>> Hello() | |
{ | |
await Task.Delay(1); | |
return "Hello"; | |
} | |
[HttpGet("SendSMS")] | |
public async Task<ActionResult<string>> SendSMS() | |
{ | |
string payload = DateTime.Now.ToShortTimeString(); | |
await _taskQueue.QueueBackgroundWorkItemAsync(new SMSGateway(_logger, payload).SendSMSAsync); | |
return "Task queued"; | |
} | |
class SMSGateway { | |
private readonly ILogger<SMSController> _logger; | |
private readonly string _payload; | |
public SMSGateway(ILogger<SMSController> logger, string payload) { | |
_logger = logger; | |
_payload = payload; | |
} | |
internal async ValueTask SendSMSAsync(CancellationToken token) | |
{ | |
int delayLoop = 0; | |
var guid = Guid.NewGuid(); | |
_logger.LogInformation("Queued work item {Guid} is starting.", guid); | |
while (!token.IsCancellationRequested && delayLoop < 1) | |
{ | |
try | |
{ | |
//todo: Code to send SMS | |
await Task.Delay(TimeSpan.FromSeconds(2), token); | |
} | |
catch (OperationCanceledException) | |
{ | |
// Prevent throwing if the Delay is cancelled | |
} | |
++ delayLoop; | |
_logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop); | |
} | |
string format = delayLoop switch | |
{ | |
3 => "Queued Background Task {Guid} and {_payload} is complete.", | |
_ => "Queued Background Task {Guid} and {_payload} was cancelled." | |
}; | |
_logger.LogInformation(format, guid,_payload); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using Microsoft.AspNetCore.Hosting; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.AspNetCore.Builder; | |
var builder = WebApplication.CreateBuilder(args); | |
// Add services to the container. | |
builder.Services.AddControllers(); | |
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle | |
builder.Services.AddEndpointsApiExplorer(); | |
builder.Services.AddSwaggerGen(); | |
builder.Services.AddHostedService<apiwithqueue.hostedservice.QueuedHostedService>(); | |
builder.Services.AddSingleton<apiwithqueue.hostedservice.IBackgroundTaskQueue>(ctx => | |
{ | |
// if (!int.TryParse(hostContext.Configuration["QueueCapacity"], out var queueCapacity)) | |
// queueCapacity = 100; | |
return new apiwithqueue.hostedservice.BackgroundTaskQueue(100); | |
}); | |
var app = builder.Build(); | |
// Configure the HTTP request pipeline. | |
if (app.Environment.IsDevelopment()) | |
{ | |
app.UseSwagger(); | |
app.UseSwaggerUI(); | |
} | |
app.UseHttpsRedirection(); | |
app.UseAuthorization(); | |
app.MapControllers(); | |
app.Run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment