Created
May 27, 2020 18:17
-
-
Save jespersh/72c0d94812cc1bdfdb985696e2fd3871 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ScheduledJobsManager : IScheduledJobsManager | |
{ | |
public List<JobScheduleDetails> JobScheduleDetails { get; } | |
public ScheduledJobsManager() | |
{ | |
JobScheduleDetails = new List<JobScheduleDetails>(); | |
} | |
public void RegisterJob(JobScheduleDetails schedule) | |
{ | |
JobScheduleDetails.Add(schedule); | |
} | |
} | |
public class JobScheduleDetails | |
{ | |
private string _schedule; | |
private bool _active; | |
public JobScheduleDetails(string schedule, string jobName) | |
{ | |
Schedule = schedule; | |
JobName = jobName; | |
Active = true; | |
} | |
public string Schedule | |
{ | |
get => _schedule; | |
set | |
{ | |
_schedule = value; | |
CronSchedule = CronExpression.Parse(_schedule); | |
UpdateNextRun(); | |
} | |
} | |
public string JobName { get; } | |
public bool Active | |
{ | |
get => _active; | |
set | |
{ | |
_active = value; | |
if (Active) UpdateNextRun(); | |
else NextRun = null; | |
} | |
} | |
public CronExpression CronSchedule { get; private set; } | |
public DateTime? NextRun { get; private set; } | |
public void UpdateNextRun() | |
{ | |
NextRun = CronSchedule.GetNextOccurrence(SystemClock.UtcNow); | |
} | |
} | |
public class BlazingJobRegistration | |
{ | |
public BlazingJobRegistration(Func<IJob> jobFactory, JobScheduleDetails schedule) | |
{ | |
JobFactory = jobFactory; | |
Schedule = schedule; | |
} | |
public Func<IJob> JobFactory { get; } | |
public JobScheduleDetails Schedule { get; } | |
} | |
public class ScheduledJobService : BackgroundService, IJobStatus | |
{ | |
private readonly List<ScheduledJobRunner> _jobs; | |
private readonly IServiceProvider _serviceProvider; | |
private readonly IScheduledJobsManager _scheduledJobsManager; | |
public ScheduledJobService(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IScheduledJobsManager scheduledJobsManager) | |
{ | |
_serviceProvider = serviceProvider; | |
_scheduledJobsManager = scheduledJobsManager; | |
var cacheClient = serviceProvider.GetService<ICacheClient>() ?? new InMemoryCacheClient(); | |
_jobs = new List<ScheduledJobRunner>(serviceProvider.GetServices<BlazingJobRegistration>().Select(j => { | |
var runner = new ScheduledJobRunner(j.JobFactory, j.Schedule, cacheClient, loggerFactory); | |
_scheduledJobsManager.RegisterJob(j.Schedule); | |
return runner; | |
})); | |
var lifetime = serviceProvider.GetService<ShutdownHostIfNoJobsRunningService>(); | |
lifetime?.RegisterHostedJobInstance(this); | |
} | |
public bool IsRunning { get; private set; } = true; | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
// TODO: Add more logging throughout | |
var startupContext = _serviceProvider.GetRequiredService<StartupActionsContext>(); | |
if (startupContext != null) | |
{ | |
var result = await startupContext.WaitForStartupAsync(stoppingToken).AnyContext(); | |
if (!result.Success) | |
{ | |
IsRunning = false; | |
throw new ApplicationException("Failed to wait for startup actions to complete."); | |
} | |
} | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
var jobsToRun = _jobs.Where(j => j.ShouldRun()).ToArray(); | |
foreach (var jobToRun in jobsToRun) | |
await jobToRun.StartAsync(stoppingToken).AnyContext(); | |
// run jobs every minute since that is the lowest resolution of the cron schedule | |
var now = SystemClock.Now; | |
var nextMinute = now.AddTicks(TimeSpan.FromMinutes(1).Ticks - (now.Ticks % TimeSpan.FromMinutes(1).Ticks)); | |
var timeUntilNextMinute = nextMinute.Subtract(SystemClock.Now).Add(TimeSpan.FromMilliseconds(1)); | |
await Task.Delay(timeUntilNextMinute, stoppingToken).AnyContext(); | |
} | |
} | |
private class ScheduledJobRunner | |
{ | |
private readonly Func<IJob> _jobFactory; | |
//private readonly CronExpression _cronSchedule; | |
private readonly ILockProvider _lockProvider; | |
private readonly ILogger _logger; | |
private readonly DateTime _baseDate = new DateTime(2010, 1, 1); | |
private string _cacheKeyPrefix; | |
public ScheduledJobRunner(Func<IJob> jobFactory, JobScheduleDetails schedule, ICacheClient cacheClient, ILoggerFactory loggerFactory = null) | |
{ | |
_jobFactory = jobFactory; | |
Schedule = schedule; | |
_logger = loggerFactory?.CreateLogger<ScheduledJobRunner>() ?? NullLogger<ScheduledJobRunner>.Instance; | |
var interval = TimeSpan.FromMinutes(1); | |
/*var nextOccurrence = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); | |
if (nextOccurrence.HasValue) | |
{ | |
var nextNextOccurrence = _cronSchedule.GetNextOccurrence(nextOccurrence.Value); | |
if (nextNextOccurrence.HasValue) | |
interval = nextNextOccurrence.Value.Subtract(nextOccurrence.Value); | |
}*/ | |
_lockProvider = new ThrottlingLockProvider(cacheClient, 1, interval.Add(interval)); | |
//NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow); | |
} | |
public JobScheduleDetails Schedule { get; } | |
public DateTime? LastRun { get; private set; } | |
public DateTime? NextRun => Schedule.NextRun; | |
public Task RunTask { get; private set; } | |
public bool ShouldRun() | |
{ | |
if (!Schedule.Active) | |
return false; | |
if (!NextRun.HasValue) | |
return false; | |
// not time yet | |
if (NextRun > SystemClock.UtcNow) | |
return false; | |
// check if already run | |
if (LastRun != null && LastRun.Value == NextRun.Value) | |
return false; | |
return true; | |
} | |
public async Task<bool> StartAsync(CancellationToken cancellationToken = default) | |
{ | |
// using lock provider in a cluster with a distributed cache implementation keeps cron jobs from running duplicates | |
// TODO: provide ability to run cron jobs on a per host isolated schedule | |
return await _lockProvider.TryUsingAsync(GetLockKey(NextRun.Value), t => | |
{ | |
// start running the job in a thread | |
RunTask = Task.Factory.StartNew(async () => | |
{ | |
var job = _jobFactory(); | |
// TODO: Don't calculate job name every time | |
string jobName = job.GetType().Name; | |
var result = await _jobFactory().TryRunAsync(cancellationToken).AnyContext(); | |
// TODO: Should we only set last run on success? Seems like that could be bad. | |
_logger.LogJobResult(result, jobName); | |
}, cancellationToken).Unwrap(); | |
LastRun = NextRun; | |
Schedule.UpdateNextRun(); | |
return Task.CompletedTask; | |
}, TimeSpan.Zero, TimeSpan.Zero).AnyContext(); | |
} | |
private string GetLockKey(DateTime date) | |
{ | |
if (_cacheKeyPrefix == null) | |
_cacheKeyPrefix = TypeHelper.GetTypeDisplayName(_jobFactory().GetType()); | |
long minute = (long)date.Subtract(_baseDate).TotalMinutes; | |
return _cacheKeyPrefix + minute; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment