Skip to content

Instantly share code, notes, and snippets.

@meisinger
Created May 29, 2014 03:32
Show Gist options
  • Save meisinger/53ad8187770e75d7a813 to your computer and use it in GitHub Desktop.
Save meisinger/53ad8187770e75d7a813 to your computer and use it in GitHub Desktop.
example of a task controller
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using StructureMap;
namespace Task.Services.Service
{
public class Controller
{
private readonly IActiveTaskContextProvider activeContexts;
private readonly IScheduleProvider scheduleProvider;
private readonly ITaskProvider taskProvider;
private readonly ILogger logger;
public Controller(IScheduleProvider scheduleProvider, ITaskProvider taskProvider, IActiveTaskContextProvider activeContexts)
{
this.scheduleProvider = scheduleProvider;
this.taskProvider = taskProvider;
this.activeContexts = activeContexts;
logger = LoggerFactory.CreateLogger("Controller");
}
public void RunThread(CancellationToken token)
{
using (var manual = new ManualResetEventSlim())
{
while (true)
{
if (token.IsCancellationRequested)
break;
var identifiers = scheduleProvider.IdentifyTasksToExecute(DateTime.Now);
logger.DebugFormat("Identified {0} tasks that should be ran.", identifiers.Length);
foreach (var identifier in identifiers)
{
logger.TraceFormat("Identified \"{0}\" should be ran.", identifier);
ActiveTaskContext context;
if (activeContexts.TryGetValue(identifier, out context) && context.IsRunning)
{
logger.WarnFormat("Task \"{0}\" has been scheduled to run but is currently active.",
identifier);
logger.InfoFormat("Task \"{0}\" has been active since {1:yyyy-MM-dd hh:mm:ss}.",
context.TaskName, context.StartDate);
continue;
}
ExecuteTask(identifier, token);
}
try
{
manual.Wait(TimeSpan.FromSeconds(15), token);
}
catch (OperationCanceledException)
{
break;
}
}
}
var runningTasks = activeContexts.GetRunningContexts()
.Select(x => x.RunningTask).ToArray();
if (activeContexts.Any())
WaitOnTasks(runningTasks);
}
public void ExecuteTask(string taskId, CancellationToken token)
{
scheduleProvider.PopTaskFromConsideration(taskId);
var model = taskProvider.LoadTaskConfiguration(taskId);
if (model == null)
{
logger.WarnFormat("Unable to identify configuration for task \"{0}\".", taskId);
return;
}
var context = ActiveTaskContext.Create(model.TaskId, model.TaskName, false);
if (!model.IsContainer)
{
ExecuteTask(context, model, token);
return;
}
var collection = IdentifyTasksToExecute(model)
.Where(x => x is TaskModel)
.Cast<TaskModel>()
.ToArray();
ExecuteTaskContainer(context, collection, token);
}
public void ExecuteTask(ActiveTaskContext context, TaskModel model, CancellationToken token)
{
var task = new Task(() =>
{
try
{
var instance = LoadTaskInstance(model);
logger.InfoFormat("Executing task \"{0}\".", model.TaskName);
instance.Execute(context.Token);
}
catch (OperationCanceledException)
{
if (context.ServiceToken.IsCancellationRequested)
context.ServiceToken.ThrowIfCancellationRequested();
}
}, token, TaskCreationOptions.LongRunning);
task.ContinueWith(x => ContinueOnComplete(context), TaskContinuationOptions.OnlyOnRanToCompletion);
task.ContinueWith(x => ContinueOnCancellation(context), TaskContinuationOptions.OnlyOnCanceled);
task.ContinueWith(_ => ContinueOnFault(x, context), TaskContinuationOptions.OnlyOnFaulted);
activeContexts.TryAdd(context.TaskId, context);
context.SetActiveTask(task, token);
task.Start();
}
public void ExecuteTaskContainer(ActiveTaskContext context, TaskModel[] models, CancellationToken token)
{
var task = new Task(() =>
{
try
{
foreach (var model in models)
{
if (context.ServiceToken.IsCancellationRequested)
context.ServiceToken.ThrowIfCancellationRequested();
if (context.Token.IsCancellationRequested)
break;
var instance = LoadTaskInstance(model);
logger.InfoFormat("Executing task \"{0}\".", model.TaskName);
instance.Execute(context.Token);
}
}
catch (OperationCanceledException)
{
if (context.ServiceToken.IsCancellationRequested)
context.ServiceToken.ThrowIfCancellationRequested();
}
}, token, TaskCreationOptions.LongRunning);
task.ContinueWith(_ => ContinueOnComplete(context), TaskContinuationOptions.OnlyOnRanToCompletion);
task.ContinueWith(_ => ContinueOnCancellation(context), TaskContinuationOptions.OnlyOnCanceled);
task.ContinueWith(x => ContinueOnFault(x, context), TaskContinuationOptions.OnlyOnFaulted);
activeContexts.TryAdd(context.TaskId, context);
context.SetActiveTask(task, token);
task.Start();
}
private void ContinueOnComplete(ActiveTaskContext context)
{
var messageFormat = (!context.WasTaskCancelled)
? "Task \"{0}\" completed."
: "Task \"{0}\" was cancelled. This task may not have completed.";
logger.InfoFormat(messageFormat, context.TaskName);
ActiveTaskContext existingContext;
if (!activeContexts.TryRemove(context.TaskId, out existingContext))
{
logger.WarnFormat(
"Unable to remove task \"{0}\". This may occur if the task was already removed or evicted from the dictionary.",
context.TaskName);
return;
}
if (!context.WasTaskCancelled)
scheduleProvider.PushTaskForConsideration(context.TaskId);
context.Dispose();
}
private void ContinueOnCancellation(ActiveTaskContext context)
{
logger.InfoFormat("Task \"{0}\" was asked to stop. This task shut down gracefully.", context.TaskName);
ActiveTaskContext existingContext;
if (!activeContexts.TryRemove(context.TaskId, out existingContext))
{
logger.WarnFormat(
"Unable to remove task \"{0}\". This may occur if the task was already removed or evicted from the dictionary.",
context.TaskName);
return;
}
logger.WarnFormat("Removed task \"{0}\" from the dictionary.", context.TaskName);
context.Dispose();
}
private void ContinueOnFault(Task task, ActiveTaskContext context)
{
logger.ErrorFormat(task.Exception, "Fatal error occured with task \"{0}\".", context.TaskName);
ActiveTaskContext existing;
if (!activeContexts.TryRemove(context.TaskId, out existing))
{
logger.WarnFormat(
"Unable to remove task \"{0}\". This may occur if the task was already removed or evicted from the dictionary.",
context.TaskName);
return;
}
logger.WarnFormat("Removed task \"{0}\" from the dictionary.", context.TaskName);
context.Dispose();
}
private IEnumerable<TaskModel> IdentifyTasksToExecute(TaskModel model)
{
var collection = model.Items;
var executionPlan = scheduleProvider.IdentifyExecutionPlan(model.TaskId);
if (executionPlan.Any())
{
collection = collection
.Join(executionPlan, o => o.TaskId, i => i, (o, i) => o)
.ToArray();
var invalidEntries = executionPlan
.Except(model.Items.Select(x => x.TaskId))
.ToArray();
if (invalidEntries.Any())
{
logger.WarnFormat(
"Invalid execution plan found for task \"{0}\". The following tasks are invalid for this configuration \"{1}\".",
model.TaskId, string.Join(", ", invalidEntries));
}
}
return collection;
}
private ITask LoadTaskInstance(TaskModel model)
{
var taskInfo = new TaskInfoModel
{
TaskId = model.TaskId,
TaskName = model.TaskName
};
var factory = ObjectFactory
.GetAllInstances<ITaskFactory>()
.FirstOrDefault(x => x.IsSatisfiedBy(taskInfo));
if (factory == null)
{
var message =
string.Format("No corresponding Task Factory could be found for task \"{0}\". Task will not be executed.",
model.TaskName);
throw new InvalidOperationException(message);
}
var instance = factory.GetTask(taskInfo);
if (instance == null)
{
var message =
string.Format("Unable to create instance for task \"{0}\". Task will not be executed.",
model.TaskName);
throw new InvalidOperationException(message);
}
return instance;
}
private void WaitOnTasks(Task[] tasks)
{
if (!tasks.Any())
return;
try
{
Task.WaitAll(tasks);
}
catch (AggregateException ex)
{
logger.WarnFormat(ex, "Expected error occurred while waiting for \"{0}\" task(s) to complete.",
tasks.Length);
}
catch (Exception ex)
{
logger.ErrorFormat(ex, "Unknown error occurred while waiting for \"{0}\" task(s) to complete.",
tasks.Length);
}
}
}
public class ActiveTaskContextProvider : IActiveTaskContextProvider
{
private readonly ConcurrentDictionary<string, ActiveTaskContext> activeContexts;
public ICollection<ActiveTaskContext> Values
{
get { return activeContexts.Values; }
}
public ActiveTaskContextProvider()
{
activeContexts = new ConcurrentDictionary<string, ActiveTaskContext>();
}
public bool TryAdd(string taskId, ActiveTaskContext context)
{
return activeContexts.TryAdd(taskId, context);
}
public bool TryGetValue(string taskId, out ActiveTaskContext context)
{
return activeContexts.TryGetValue(taskId, out context);
}
public bool TryRemove(string taskId, out ActiveTaskContext context)
{
return activeContexts.TryRemove(taskId, out context);
}
public ActiveTaskContext[] GetRunningContexts()
{
return activeContexts.Values
.Where(x => x.IsRunning).ToArray();
}
public bool Any()
{
return activeContexts.Any();
}
public bool CancelTask(string taskId)
{
ActiveTaskContext context;
if (activeContexts.TryGetValue(taskId, out context))
return context.CancelTask();
return false;
}
}
public class ActiveTaskContext
{
private Task activeTask;
private CancellationTokenSource linkedSource;
private CancellationTokenSource internalSource;
private DateTime cancelTime = DateTime.MaxValue;
public string TaskId { get; private set; }
public string TaskName { get; private set; }
public DateTime StartDate { get; private set; }
public CancellationToken ServiceToken { get; private set; }
public bool IsRunning
{
get { return (activeTask != null && (activeTask.Status == TaskStatus.Running)); }
}
public Task RunningTask
{
get { return activeTask; }
}
public CancellationToken Token
{
get { return linkedSource.Token; }
}
public bool WasTaskCancelled
{
get { return (linkedSource.IsCancellationRequested); }
}
public bool WasServiceCancelled
{
get { return (ServiceToken.IsCancellationRequested); }
}
public void Dispose()
{
if (activeTask != null)
{
activeTask.Dispose();
activeTask = null;
}
if (linkedSource != null)
{
linkedSource.Dispose();
linkedSource = null;
}
if (internalSource != null)
{
internalSource.Dispose();
internalSource = null;
}
}
public bool CancelTask()
{
if (internalSource == null)
return false;
if (internalSource.IsCancellationRequested)
return false;
internalSource.Cancel();
if (cancelTime == DateTime.MaxValue)
cancelTime = DateTime.Now;
return true;
}
public void SetActiveTask(Task task, CancellationToken token)
{
activeTask = task;
internalSource = new CancellationTokenSource();
linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, internalSource.Token);
StartDate = DateTime.Now;
ServiceToken = token;
}
public static ActiveTaskContext Create(string taskId, string taskName)
{
return new ActiveTaskContext
{
TaskId = taskId,
TaskName = taskName
};
}
}
public class TaskProvider : ITaskProvider
{
private readonly TaskModel[] collection;
public TaskProvider(TaskModel[] collection)
{
this.collection = collection;
}
public TaskModel LoadTaskConfiguration(string taskId)
{
return collection.FirstOrDefault(x => x.TaskId.Equals(taskId, StringComparison.OrdinalIgnoreCase));
}
}
public class ScheduleProvider : IScheduleProvider
{
private readonly ILogger logger;
private readonly List<TaskScheduleModel> collection;
private readonly List<string> considerations;
private readonly Dictionary<string, DateTime> schedules;
private readonly Dictionary<string, List<string>> executionPlans;
public ScheduleProvider(TaskScheduleModel[] collection)
{
this.collection = new List<TaskScheduleModel>(collection);
logger = LoggerFactory.CreateLogger("Scheduler");
considerations = new List<string>();
schedules = new Dictionary<string, DateTime>();
executionPlans = new Dictionary<string, List<string>>();
PushTasksForInitialConsideration(collection);
}
public string[] IdentifyTasksToExecute(DateTime date)
{
return schedules
.Where(x => x.Value <= date)
.Select(x => x.Key)
.ToArray();
}
public string[] IdentifyExecutionPlan(string taskId)
{
lock (executionPlans)
{
List<string> plans;
if (executionPlans.TryGetValue(taskId, out plans) && plans.Count != 0)
return plans.ToArray();
return new string[0];
}
}
public void PopTaskFromConsideration(string taskId)
{
if (!schedules.ContainsKey(taskId))
return;
logger.DebugFormat("Removing task \"{0}\" from consideration.", taskId);
lock (schedules)
{
schedules.Remove(taskId);
}
}
public void PushTaskForConsideration(string taskId)
{
if (schedules.ContainsKey(taskId))
{
logger.WarnFormat("Task \"{0}\" has already been scheduled. Tash will not be considered.", taskId);
return;
}
var model = collection.FirstOrDefault(x => x.TaskId.Equals(taskId, StringComparison.Ordinal));
if (model == null)
{
if (!considerations.Contains(taskId, StringComparer.OrdinalIgnoreCase))
return;
logger.ErrorFormat("Task \"{0}\" was not found. Task cannot be considered.", taskId);
return;
}
PushTaskForConsideration(model);
}
public void PushTaskForConsideration(TaskScheduleModel model)
{
var executionDate = model.IdentifyNextEntry(DateTime.Now);
logger.DebugFormat("Task \"{0}\" next execution is: \"{1}\".", model.TaskId, executionDate);
lock (schedules)
{
schedules[model.TaskId] = executionDate;
}
if (!model.IsContainer)
return;
IdentifyExecutionPlan(model.TaskId, model.ScheduleItems);
}
public void PushTasksForInitialConsideration(TaskScheduleModel[] models)
{
foreach(var model in models)
PushTaskForInitialConsideration(model);
}
public void PushTaskForInitialConsideration(TaskScheduleModel model)
{
var executionDate = model.IdentifyNextEntry(DateTime.Now);
if (model.IsContinuous)
executionDate = DateTime.Now;
logger.DebugFormat("Task \"{0}\" next execution is: \"{1}\".", model.TaskId, executionDate);
considerations.Add(model.TaskId);
schedules[model.TaskId] = executionDate;
if (!model.IsContainer)
return;
IdentifyExecutionPlan(model.TaskId, model.ScheduleItems);
}
private void IdentifyExecutionPlan(string taskId, ICollection<TaskScheduleItemModel> items)
{
if (string.IsNullOrWhiteSpace(taskId))
throw new ArgumentNullException("taskId");
if (items == null || items.Count == 0)
return;
var ordering = items
.OrderBy(x => x.Order)
.Select(x => x.TaskId)
.ToArray();
logger.DebugFormat("Task \"{0}\" is a container task. Execution plan: \"{1}\".",
taskId, string.Join(", ", ordering));
lock (executionPlans)
{
var execution = new List<string>(ordering);
executionPlans[taskId] = execution;
}
if (!items.Any(x => x.HasScheduleItems))
return;
foreach (var item in items.Where(x => x.HasScheduleItems))
IdentifyExecutionPlan(item.TaskId, item.ScheduleItems);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment