Skip to content

Instantly share code, notes, and snippets.

@BornaGajic
Last active February 5, 2024 08:25
Show Gist options
  • Save BornaGajic/46a80db79a410ce24446a32007cb3eea to your computer and use it in GitHub Desktop.
Save BornaGajic/46a80db79a410ce24446a32007cb3eea to your computer and use it in GitHub Desktop.
Decoupling from the Quartz.NET library
[DisallowConcurrentExecution]
public class QuartzJobAdapter<TJob> : IJobAdapter, Quartz.IJob
where TJob : IJob
{
private readonly TJob _job;
[ActivatorUtilitiesConstructor]
public QuartzJobAdapter(TJob job)
{
_job = job;
}
public async Task Execute(IJobExecutionContext context)
{
try
{
await _job.Execute(new JobContext
{
CancellationToken = context.CancellationToken,
NextFireTimeUtc = context.NextFireTimeUtc,
PreviousFireTimeUtc = context.PreviousFireTimeUtc
});
}
catch (Exception ex)
{
throw new JobExecutionException(ex);
}
}
}
public class QuartzJobFactory : PropertySettingJobFactory
{
private readonly IServiceProvider _serviceProvider;
private readonly JobActivatorCache activatorCache = new();
public QuartzJobFactory(IServiceProvider serviceProvider) => _serviceProvider = serviceProvider;
public override void ReturnJob(Quartz.IJob job)
{
(job as IDisposable)?.Dispose();
}
public override void SetObjectProperties(object obj, JobDataMap data)
{
base.SetObjectProperties(obj is ScopedJob scopedJob ? scopedJob.InnerJob : obj, data);
}
protected override Quartz.IJob InstantiateJob(TriggerFiredBundle bundle, Quartz.IScheduler scheduler)
{
var serviceScope = _serviceProvider.CreateScope();
var (innerJob, flag) = CreateJob(bundle, serviceScope.ServiceProvider);
return new ScopedJob(serviceScope, innerJob, !flag);
}
private (Quartz.IJob Job, bool FromContainer) CreateJob(TriggerFiredBundle bundle, IServiceProvider serviceProvider)
{
var innerJobType = bundle.JobDetail.JobType.GetGenericArguments().SingleOrDefault();
if (
(innerJobType?.IsAssignableTo(typeof(IJob)) ?? false)
&& !serviceProvider.GetRequiredService<IServiceProviderIsService>().IsService(innerJobType)
)
{
throw new Exception($"Register all {nameof(IJob)} implementations directly, i.e. they should be resolvable through service provider.");
}
else if (
!bundle.JobDetail.JobType.IsAssignableTo(typeof(IJobAdapter))
&& serviceProvider.GetService(bundle.JobDetail.JobType) is Quartz.IJob quartzJob
)
{
return (quartzJob, true);
}
return (activatorCache.CreateInstance(serviceProvider, bundle.JobDetail.JobType), false);
}
internal sealed class JobActivatorCache
{
private readonly ConcurrentDictionary<Type, ObjectFactory> activatorCache = new();
public Quartz.IJob CreateInstance(IServiceProvider serviceProvider, Type jobType)
{
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(jobType);
var orAdd = activatorCache.GetOrAdd(jobType, ActivatorUtilities.CreateFactory, Type.EmptyTypes);
return (Quartz.IJob)orAdd(serviceProvider, null);
}
}
private sealed class ScopedJob : Quartz.IJob, IDisposable
{
private readonly bool _canDispose;
private readonly IServiceScope _scope;
public ScopedJob(IServiceScope scope, Quartz.IJob innerJob, bool canDispose)
{
_scope = scope;
_canDispose = canDispose;
InnerJob = innerJob;
}
internal Quartz.IJob InnerJob { get; }
public void Dispose()
{
if (_canDispose)
{
(InnerJob as IDisposable)?.Dispose();
}
_scope.Dispose();
}
public Task Execute(IJobExecutionContext context) => InnerJob.Execute(context);
}
}
public class QuartzScheduler : IScheduler
{
private readonly Quartz.IScheduler _scheduler;
public QuartzScheduler(Quartz.IScheduler scheduler)
{
_scheduler = scheduler;
}
public async Task AddJobAsync<TJob>(string key, CancellationToken cancellationToken = default)
where TJob : IJob
{
if (await _scheduler.CheckExists(jobDetail.Key, cancellationToken))
return;
var jobDetail = CreateJobDetail<TJob>(key);
await _scheduler.AddJob(jobDetail, false, true, cancellationToken);
}
public async ValueTask StartAsync(CancellationToken cancellationToken = default)
{
if (!_scheduler.IsStarted)
await _scheduler.Start(cancellationToken);
}
public async ValueTask StopAsync(CancellationToken cancellationToken = default)
{
if (!_scheduler.IsShutdown)
await _scheduler.Shutdown(cancellationToken);
}
public Task TriggerJobAsync(string key, CancellationToken cancellationToken = default)
=> _scheduler.TriggerJob(JobKey.Create(key), cancellationToken);
private static IJobDetail CreateJobDetail<TJob>(string key)
where TJob : IJob
{
var builder = JobBuilder.Create<QuartzJobAdapter<TJob>>();
builder.WithIdentity(key);
return builder.Build();
}
private ITrigger CreateTrigger(JobKey jobKey, string cronExpression)
{
var builder = TriggerBuilder.Create();
builder.ForJob(jobKey).WithIdentity(GetTriggerKey(jobKey));
return builder.Build();
}
private TriggerKey GetTriggerKey(JobKey jobKey) => new TriggerKey(jobKey.Name);
}
public class SchedulerTestBase : TestSetup
{
public SchedulerTestBase()
{
var configuration = SetupConfiguration();
Container = SetupContainer(svc =>
{
svc.AddQuartz(cfg =>
{
cfg.UseInMemoryStore();
cfg.UseJobFactory<QuartzJobFactory>();
cfg.UseTimeZoneConverter();
});
svc.TryAddSingleton(svc => svc.GetRequiredService<ISchedulerFactory>().GetScheduler().GetAwaiter().GetResult());
svc.TryAddSingleton<IScheduler, QuartzScheduler>();
svc.AddKeyedSingleton<TaskCompletionSource>(nameof(TestJob));
svc.AddTransient<TestJob>();
});
Scheduler = Container.GetRequiredService<IScheduler>();
}
public IServiceProvider Container { get; private set; }
public IScheduler Scheduler { get; private set; }
}
public class SchedulerTest : SchedulerTestBase
{
[Fact]
public async Task T01()
{
var completion = Container.GetRequiredKeyedService<TaskCompletionSource>(nameof(TestJob));
await Scheduler.AddJobAsync<TestJob>("test-job");
await Scheduler.StartAsync();
await Scheduler.TriggerJobAsync("test-job");
await completion.Task;
}
}
public class TestJob([FromKeyedServices(nameof(TestJob))] TaskCompletionSource Completion) : IJob
{
public async Task Execute(IJobContext context)
{
Console.WriteLine($"Starting {nameof(TestJob)} execution.");
await Task.Delay(250);
Console.WriteLine($"Ending {nameof(TestJob)} execution.");
Completion.SetResult();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment