Skip to content

Instantly share code, notes, and snippets.

@justinobney
Created June 22, 2024 17:07
Show Gist options
  • Save justinobney/13faab60f196e8a3088c57782238dcc2 to your computer and use it in GitHub Desktop.
Save justinobney/13faab60f196e8a3088c57782238dcc2 to your computer and use it in GitHub Desktop.
async Task Main()
{
// Initialize settings
var transformFileSettings = new TransformFileSettings { BlobPath = "some/path", BusinessUnitId = 1 };
var uploadToBlobSettings = new UploadToBlobSettings { ContainerName = "some-container" };
var firstTask = new TransformFile(transformFileSettings, Guid.Empty);
var secondTask = new UploadToBlob(uploadToBlobSettings, firstTask.TaskId);
var tasks = new List<IJobExecutionTask> { firstTask, secondTask };
var context = new Dictionary<string, string>();
var jobExecution = new JobExecution(tasks, context);
jobExecution.OnTaskStarted += (sender, args) =>
Console.WriteLine($"[Event] Task {args.TaskType} has started.");
jobExecution.OnTaskCompleted += (sender, args) =>
Console.WriteLine($"[Event] Task {args.TaskType} has completed.");
if (!jobExecution.ValidateTaskTree())
{
Console.WriteLine("Invalid task tree. Exiting...");
return;
}
await jobExecution.Run();
}
public class JobExecutionStateDto
{
public int Id { get; set; }
public string ExecutionStatus { get; set; }
public Guid Identifier { get; set; }
public string ContextSnapshotJson { get; set; }
}
public interface IJobExecutionTask
{
Guid TaskId { get; }
Guid ExecuteAfter { get; }
Task<Guid> ExecuteAsync(IJobExecution execution);
}
public class TransformFile : IJobExecutionTask
{
public Guid TaskId { get; }
public Guid ExecuteAfter { get; }
public TransformFileSettings Settings { get; }
public TransformFile(TransformFileSettings settings, Guid executeAfter)
{
TaskId = Guid.NewGuid();
ExecuteAfter = executeAfter;
Settings = settings;
}
public async Task<Guid> ExecuteAsync(IJobExecution execution)
{
// Simulate real work
Console.WriteLine("TransformFile. ExecuteAsync...");
await Task.Delay(1000);
return TaskId;
}
}
public class UploadToBlob : IJobExecutionTask
{
public Guid TaskId { get; }
public Guid ExecuteAfter { get; }
public UploadToBlobSettings Settings { get; }
public UploadToBlob(UploadToBlobSettings settings, Guid executeAfter)
{
TaskId = Guid.NewGuid();
ExecuteAfter = executeAfter;
Settings = settings;
}
public async Task<Guid> ExecuteAsync(IJobExecution execution)
{
// Simulate real work
Console.WriteLine("UploadToBlob. ExecuteAsync...");
await Task.Delay(1000);
return TaskId;
}
}
public class TransformFileSettings
{
public string BlobPath { get; set; }
public int BusinessUnitId { get; set; }
}
public class UploadToBlobSettings
{
public string ContainerName { get; set; }
}
public interface IJobExecution
{
Dictionary<string, string> Context { get; set; }
string Status { get; }
Task Run();
}
public class JobExecution : IJobExecution
{
public event EventHandler<TaskEventArgs> OnTaskStarted;
public event EventHandler<TaskEventArgs> OnTaskCompleted;
// Simulated in-memory database
private static readonly Dictionary<Guid, JobExecutionStateDto> _db = new Dictionary<Guid, JobExecutionStateDto>();
public Dictionary<string, string> Context { get; set; }
public string Status => ExecutionState.ExecutionStatus;
private readonly List<IJobExecutionTask> _tasks;
public JobExecutionStateDto ExecutionState { get; private set; }
public JobExecution(List<IJobExecutionTask> tasks, Dictionary<string, string> context)
{
ExecutionState = new JobExecutionStateDto
{
Identifier = Guid.NewGuid(),
ExecutionStatus = "Initialized"
};
_db[ExecutionState.Identifier] = ExecutionState;
Context = context;
_tasks = tasks;
}
public bool ValidateTaskTree()
{
// Storing all task IDs in a set for quick lookup
var taskIds = new HashSet<Guid>(_tasks.Select(t => t.TaskId));
// Checking for orphan tasks
foreach (var task in _tasks)
{
if (task.ExecuteAfter != Guid.Empty && !taskIds.Contains(task.ExecuteAfter))
{
Console.WriteLine($"Validation Failed: Task with ID {task.TaskId} has invalid ExecuteAfter value {task.ExecuteAfter}");
return false;
}
}
return true;
}
public async Task Run()
{
if (!ValidateTaskTree())
{
Console.WriteLine("Job aborted due to invalid task tree.");
return;
}
Console.WriteLine($"Job started. Status: {Status}");
// Using a Dictionary to map task outcomes to the next task to be executed
var taskOutcomeMap = new Dictionary<Guid, Guid>();
foreach (var task in _tasks)
{
taskOutcomeMap[task.ExecuteAfter] = task.TaskId;
}
Guid currentTaskOutcome = Guid.Empty;
while (taskOutcomeMap.ContainsKey(currentTaskOutcome))
{
var nextTaskId = taskOutcomeMap[currentTaskOutcome];
var nextTask = _tasks.First(t => t.TaskId == nextTaskId);
// Trigger the OnTaskStarted event
OnTaskStarted?.Invoke(this, new TaskEventArgs(nextTask.GetType().Name));
currentTaskOutcome = await nextTask.ExecuteAsync(this);
// Trigger the OnTaskCompleted event
OnTaskCompleted?.Invoke(this, new TaskEventArgs(nextTask.GetType().Name));
// Update in-memory database
ExecutionState.ContextSnapshotJson = JsonConvert.SerializeObject(Context);
ExecutionState.ExecutionStatus = "In Progress";
_db[ExecutionState.Identifier] = ExecutionState;
}
ExecutionState.ExecutionStatus = "Completed";
_db[ExecutionState.Identifier] = ExecutionState;
Console.WriteLine($"Job completed. Status: {Status}");
}
}
public class TaskEventArgs : EventArgs
{
public string TaskType { get; set; }
public TaskEventArgs(string taskType)
{
TaskType = taskType;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment