Skip to content

Instantly share code, notes, and snippets.

@sachinsu
Created March 21, 2023 09:07
Show Gist options
  • Save sachinsu/d0d2672d002be670366d58394c1d6031 to your computer and use it in GitHub Desktop.
Save sachinsu/d0d2672d002be670366d58394c1d6031 to your computer and use it in GitHub Desktop.
Asynchronous task scheduling (Queuing) in Web API
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Hosting;
namespace apiwithqueue.hostedservice {
public class QueuedHostedService : BackgroundService
{
private readonly ILogger<QueuedHostedService> _logger;
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger)
{
TaskQueue = taskQueue;
_logger = logger;
}
public IBackgroundTaskQueue TaskQueue { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
$"Queued Hosted Service is running.{Environment.NewLine}" +
$"{Environment.NewLine}Tap W to add a work item to the " +
$"background queue.{Environment.NewLine}");
await BackgroundProcessing(stoppingToken);
}
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem =
await TaskQueue.DequeueAsync(stoppingToken);
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queued Hosted Service is stopping.");
await base.StopAsync(stoppingToken);
}
}
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public BackgroundTaskQueue(int capacity)
{
// Capacity should be set based on the expected application load and
// number of concurrent threads accessing the queue.
// BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
// which completes only when space became available. This leads to backpressure,
// in case too many publishers/calls start accumulating.
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using apiwithqueue.hostedservice;
namespace apiwithqueue.Controllers
{
[ApiController]
[Route("[controller]")]
public class SMSController : ControllerBase
{
private readonly IBackgroundTaskQueue _taskQueue;
private readonly ILogger<SMSController> _logger;
public SMSController(IBackgroundTaskQueue taskQueue ,ILogger<SMSController> logger)
{
_logger = logger;
_taskQueue = taskQueue;
}
[HttpGet("Hello")]
public async Task<ActionResult<String>> Hello()
{
await Task.Delay(1);
return "Hello";
}
[HttpGet("SendSMS")]
public async Task<ActionResult<string>> SendSMS()
{
string payload = DateTime.Now.ToShortTimeString();
await _taskQueue.QueueBackgroundWorkItemAsync(new SMSGateway(_logger, payload).SendSMSAsync);
return "Task queued";
}
class SMSGateway {
private readonly ILogger<SMSController> _logger;
private readonly string _payload;
public SMSGateway(ILogger<SMSController> logger, string payload) {
_logger = logger;
_payload = payload;
}
internal async ValueTask SendSMSAsync(CancellationToken token)
{
int delayLoop = 0;
var guid = Guid.NewGuid();
_logger.LogInformation("Queued work item {Guid} is starting.", guid);
while (!token.IsCancellationRequested && delayLoop < 1)
{
try
{
//todo: Code to send SMS
await Task.Delay(TimeSpan.FromSeconds(2), token);
}
catch (OperationCanceledException)
{
// Prevent throwing if the Delay is cancelled
}
++ delayLoop;
_logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop);
}
string format = delayLoop switch
{
3 => "Queued Background Task {Guid} and {_payload} is complete.",
_ => "Queued Background Task {Guid} and {_payload} was cancelled."
};
_logger.LogInformation(format, guid,_payload);
}
}
}
}
using System;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.AspNetCore.Builder;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddHostedService<apiwithqueue.hostedservice.QueuedHostedService>();
builder.Services.AddSingleton<apiwithqueue.hostedservice.IBackgroundTaskQueue>(ctx =>
{
// if (!int.TryParse(hostContext.Configuration["QueueCapacity"], out var queueCapacity))
// queueCapacity = 100;
return new apiwithqueue.hostedservice.BackgroundTaskQueue(100);
});
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment