Skip to content

Instantly share code, notes, and snippets.

@jespersh
Created May 27, 2020 18:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jespersh/72c0d94812cc1bdfdb985696e2fd3871 to your computer and use it in GitHub Desktop.
Save jespersh/72c0d94812cc1bdfdb985696e2fd3871 to your computer and use it in GitHub Desktop.
class ScheduledJobsManager : IScheduledJobsManager
{
public List<JobScheduleDetails> JobScheduleDetails { get; }
public ScheduledJobsManager()
{
JobScheduleDetails = new List<JobScheduleDetails>();
}
public void RegisterJob(JobScheduleDetails schedule)
{
JobScheduleDetails.Add(schedule);
}
}
public class JobScheduleDetails
{
private string _schedule;
private bool _active;
public JobScheduleDetails(string schedule, string jobName)
{
Schedule = schedule;
JobName = jobName;
Active = true;
}
public string Schedule
{
get => _schedule;
set
{
_schedule = value;
CronSchedule = CronExpression.Parse(_schedule);
UpdateNextRun();
}
}
public string JobName { get; }
public bool Active
{
get => _active;
set
{
_active = value;
if (Active) UpdateNextRun();
else NextRun = null;
}
}
public CronExpression CronSchedule { get; private set; }
public DateTime? NextRun { get; private set; }
public void UpdateNextRun()
{
NextRun = CronSchedule.GetNextOccurrence(SystemClock.UtcNow);
}
}
public class BlazingJobRegistration
{
public BlazingJobRegistration(Func<IJob> jobFactory, JobScheduleDetails schedule)
{
JobFactory = jobFactory;
Schedule = schedule;
}
public Func<IJob> JobFactory { get; }
public JobScheduleDetails Schedule { get; }
}
public class ScheduledJobService : BackgroundService, IJobStatus
{
private readonly List<ScheduledJobRunner> _jobs;
private readonly IServiceProvider _serviceProvider;
private readonly IScheduledJobsManager _scheduledJobsManager;
public ScheduledJobService(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IScheduledJobsManager scheduledJobsManager)
{
_serviceProvider = serviceProvider;
_scheduledJobsManager = scheduledJobsManager;
var cacheClient = serviceProvider.GetService<ICacheClient>() ?? new InMemoryCacheClient();
_jobs = new List<ScheduledJobRunner>(serviceProvider.GetServices<BlazingJobRegistration>().Select(j => {
var runner = new ScheduledJobRunner(j.JobFactory, j.Schedule, cacheClient, loggerFactory);
_scheduledJobsManager.RegisterJob(j.Schedule);
return runner;
}));
var lifetime = serviceProvider.GetService<ShutdownHostIfNoJobsRunningService>();
lifetime?.RegisterHostedJobInstance(this);
}
public bool IsRunning { get; private set; } = true;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// TODO: Add more logging throughout
var startupContext = _serviceProvider.GetRequiredService<StartupActionsContext>();
if (startupContext != null)
{
var result = await startupContext.WaitForStartupAsync(stoppingToken).AnyContext();
if (!result.Success)
{
IsRunning = false;
throw new ApplicationException("Failed to wait for startup actions to complete.");
}
}
while (!stoppingToken.IsCancellationRequested)
{
var jobsToRun = _jobs.Where(j => j.ShouldRun()).ToArray();
foreach (var jobToRun in jobsToRun)
await jobToRun.StartAsync(stoppingToken).AnyContext();
// run jobs every minute since that is the lowest resolution of the cron schedule
var now = SystemClock.Now;
var nextMinute = now.AddTicks(TimeSpan.FromMinutes(1).Ticks - (now.Ticks % TimeSpan.FromMinutes(1).Ticks));
var timeUntilNextMinute = nextMinute.Subtract(SystemClock.Now).Add(TimeSpan.FromMilliseconds(1));
await Task.Delay(timeUntilNextMinute, stoppingToken).AnyContext();
}
}
private class ScheduledJobRunner
{
private readonly Func<IJob> _jobFactory;
//private readonly CronExpression _cronSchedule;
private readonly ILockProvider _lockProvider;
private readonly ILogger _logger;
private readonly DateTime _baseDate = new DateTime(2010, 1, 1);
private string _cacheKeyPrefix;
public ScheduledJobRunner(Func<IJob> jobFactory, JobScheduleDetails schedule, ICacheClient cacheClient, ILoggerFactory loggerFactory = null)
{
_jobFactory = jobFactory;
Schedule = schedule;
_logger = loggerFactory?.CreateLogger<ScheduledJobRunner>() ?? NullLogger<ScheduledJobRunner>.Instance;
var interval = TimeSpan.FromMinutes(1);
/*var nextOccurrence = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow);
if (nextOccurrence.HasValue)
{
var nextNextOccurrence = _cronSchedule.GetNextOccurrence(nextOccurrence.Value);
if (nextNextOccurrence.HasValue)
interval = nextNextOccurrence.Value.Subtract(nextOccurrence.Value);
}*/
_lockProvider = new ThrottlingLockProvider(cacheClient, 1, interval.Add(interval));
//NextRun = _cronSchedule.GetNextOccurrence(SystemClock.UtcNow);
}
public JobScheduleDetails Schedule { get; }
public DateTime? LastRun { get; private set; }
public DateTime? NextRun => Schedule.NextRun;
public Task RunTask { get; private set; }
public bool ShouldRun()
{
if (!Schedule.Active)
return false;
if (!NextRun.HasValue)
return false;
// not time yet
if (NextRun > SystemClock.UtcNow)
return false;
// check if already run
if (LastRun != null && LastRun.Value == NextRun.Value)
return false;
return true;
}
public async Task<bool> StartAsync(CancellationToken cancellationToken = default)
{
// using lock provider in a cluster with a distributed cache implementation keeps cron jobs from running duplicates
// TODO: provide ability to run cron jobs on a per host isolated schedule
return await _lockProvider.TryUsingAsync(GetLockKey(NextRun.Value), t =>
{
// start running the job in a thread
RunTask = Task.Factory.StartNew(async () =>
{
var job = _jobFactory();
// TODO: Don't calculate job name every time
string jobName = job.GetType().Name;
var result = await _jobFactory().TryRunAsync(cancellationToken).AnyContext();
// TODO: Should we only set last run on success? Seems like that could be bad.
_logger.LogJobResult(result, jobName);
}, cancellationToken).Unwrap();
LastRun = NextRun;
Schedule.UpdateNextRun();
return Task.CompletedTask;
}, TimeSpan.Zero, TimeSpan.Zero).AnyContext();
}
private string GetLockKey(DateTime date)
{
if (_cacheKeyPrefix == null)
_cacheKeyPrefix = TypeHelper.GetTypeDisplayName(_jobFactory().GetType());
long minute = (long)date.Subtract(_baseDate).TotalMinutes;
return _cacheKeyPrefix + minute;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment