Skip to content

Instantly share code, notes, and snippets.

@pksorensen
Created December 20, 2020 00:53
Show Gist options
  • Save pksorensen/410b3b4b3bdc44d1b1979c7688922c2e to your computer and use it in GitHub Desktop.
Save pksorensen/410b3b4b3bdc44d1b1979c7688922c2e to your computer and use it in GitHub Desktop.
public abstract class DurableStateMachineState<TStats>
where TStats : Enum
{
public TStats OrchestratorState { get; set; }
public TimeSpan? DelayExecution { get; set; }
public DateTime StartTime { get; set; }
public int Depth { get; set; } = 0;
public string InstanceId { get; set; }
public Guid InvocationId { get; set; }
}
public class SubOrchestratrationResult<T>
{
public T Value { get; set; }
public Exception Fault { get; set; }
}
public class DurableStateMachineAttribute : Attribute
{
public string statename { get; }
public bool ContinueWhenStateNotChanged { get; set; }
public string TaskHub { get; set; }
public DurableStateMachineAttribute(string statename)
{
this.statename = statename;
}
public DurableStateMachineAttribute()
{
}
}
public abstract class DurableStateMachine<TState, TStates>
where TStates : Enum
where TState : DurableStateMachineState<TStates>
{
private readonly string getInstanceInfoActivityName;
public DurableStateMachine(string GetInstanceInfoActivityName)
{
getInstanceInfoActivityName = GetInstanceInfoActivityName;
}
public virtual string GetActivityName(TStates state)
{
var method = this.GetType().GetMethods()
.FirstOrDefault(m => string.Equals(m.GetCustomAttribute<DurableStateMachineAttribute>()?.statename, state.ToString(), StringComparison.OrdinalIgnoreCase));
var name = method?.GetCustomAttribute<FunctionNameAttribute>()?.Name;
return name;
}
public virtual bool ContinueWhenStateNotChanged(TStates state)
{
var method = this.GetType().GetMethods()
.FirstOrDefault(m => string.Equals(m.GetCustomAttribute<DurableStateMachineAttribute>()?.statename, state.ToString(), StringComparison.OrdinalIgnoreCase));
var cancontinue = method?.GetCustomAttribute<DurableStateMachineAttribute>()?.ContinueWhenStateNotChanged ??false;
return cancontinue;
}
public virtual bool IsInCompletedState(TStates state)
{
var method = this.GetType().GetMethods()
.FirstOrDefault(m => string.Equals(m.GetCustomAttribute<DurableStateMachineAttribute>()?.statename, state.ToString(), StringComparison.OrdinalIgnoreCase));
var name = method?.GetCustomAttribute<FunctionNameAttribute>()?.Name;
return string.IsNullOrEmpty(name);
}
public virtual async Task<TState> RunOrchestrationAsync(Microsoft.Azure.WebJobs.ExecutionContext executionContext, IDurableOrchestrationContext context, ILogger log)
{
var state = context.GetInput<TState>();
state.InstanceId = context.InstanceId;
state.InvocationId = executionContext.InvocationId;
if (state.Depth == 0)
state.StartTime = context.CurrentUtcDateTime;
if (!context.IsReplaying)
log.LogInformation("Running DurableStateMachine Orchestrator for {InstanceId} with {State}", context.InstanceId, state.OrchestratorState.ToString());
if (state.DelayExecution.HasValue)
{
await context.CreateTimer(context.CurrentUtcDateTime.Add(state.DelayExecution.Value),CancellationToken.None);
}
var oldState = state.OrchestratorState;
await OnBeforeActivityCallAsync(context, state, log);
var status = await context.CallActivityWithRetryAsync<DurableOrchestrationStatus[]>(
getInstanceInfoActivityName,
new RetryOptions(TimeSpan.FromSeconds(30), Debugger.IsAttached ? 1 : 3),
new OrchestrationStatusQueryCondition
{
InstanceIdPrefix = context.InstanceId
});
// var childs = status.Select(c => new { c.InstanceId, RuntimeStatus = c.RuntimeStatus.ToString(), c.LastUpdatedTime }).ToArray();
context.SetCustomStatus(new
{
depth = state.Depth,
duration = context.CurrentUtcDateTime - state.StartTime,
childs = status.GroupBy(k => k.RuntimeStatus).ToDictionary(k => k.Key, v => v.Count()),
state = oldState.ToString(),
processing = true,
});
while (true)
{
var name = GetActivityName(state.OrchestratorState);
log.LogInformation("DurableStateMachine Orchestrator calling {ActivityName} for {InstanceId} with {@State}", name, context.InstanceId, state);
state = await context.CallActivityWithRetryAsync<TState>(
name,
new RetryOptions(TimeSpan.FromSeconds(30), Debugger.IsAttached ? 1 : 3)
{
Handle = (ex) =>
{
log.LogWarning("Retrying Activity for {activityName}: {message}", name, ex.Message);
return true;
}
},
state);
var newName = GetActivityName(state.OrchestratorState);
if(name != newName || !ContinueWhenStateNotChanged(state.OrchestratorState))
{
break;
}
}
var isCompleted = IsInCompletedState(state.OrchestratorState);
context.SetCustomStatus(new
{
depth = state.Depth,
duration = context.CurrentUtcDateTime - state.StartTime,
childs = status.GroupBy(k => k.RuntimeStatus).ToDictionary(k => k.Key, v => v.Count()),
state = oldState.ToString(),
newState = state.OrchestratorState.ToString(),
isCompleted,
});
await context.CallActivityWithRetryAsync<DurableOrchestrationStatus[]>(
"PersistHistoryInformation",
new RetryOptions(TimeSpan.FromSeconds(30), Debugger.IsAttached ? 1 : 3),
new InstanceInfoRequest
{
InstanceId = context.InstanceId,
TaskHub = this.GetType().GetCustomAttribute<DurableStateMachineAttribute>().TaskHub,
InvocationId = state.InvocationId,
State = state.OrchestratorState.ToString()
});
if (!isCompleted)
{
state.Depth = Convert.ToInt32(oldState) == Convert.ToInt32(state.OrchestratorState) ? state.Depth + 1 : 0;
context.ContinueAsNew(state);
log.LogInformation("DurableStateMachine Orchestrator countinuing as new for {InstanceId} with {@State}", context.InstanceId, state);
}
return state;
}
protected virtual Task OnBeforeActivityCallAsync(IDurableOrchestrationContext context, TState state, ILogger log)
{
return Task.CompletedTask;
}
protected async virtual Task<SubOrchestratrationResult<T>[]> RunSubOrchestratorsAsync<T>(IDurableOrchestrationContext context, TState state, Task<T>[] tasks) where T : class
{
// var tasks = GetSubOrchestrators(context, state);
var queue = new List<Task<T>>(tasks);
while (queue.Any())
{
context.SetCustomStatus(new
{
depth = state.Depth,
duration = context.CurrentUtcDateTime - state.StartTime,
state = state.OrchestratorState,
suborchestrating = true,
total = tasks.Length,
done = tasks.Length - queue.Count
});
var any = await Task.WhenAny(queue);
queue.Remove(any);
}
if (tasks.Any(k => !k.IsCompletedSuccessfully))
{
throw new AggregateException("Failed to Run SubOrchestratorsAsync", tasks.Where(t => !t.IsCompletedSuccessfully).Select(k => k.Exception));
}
return tasks.Select(t => new SubOrchestratrationResult<T>
{
Value = t.IsCompletedSuccessfully ? t.Result : null as T,
Fault = t.Exception
}).ToArray();
// return await Task.WhenAll(tasks);
}
}
public class InstanceInfoRequest
{
public string InstanceId { get; set; }
public string TaskHub { get; set; }
public string State { get; set; }
public Guid InvocationId { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment