Skip to content

Instantly share code, notes, and snippets.

@zplume
Last active June 6, 2019 12:46
Show Gist options
  • Save zplume/f1a11d584a9d526dee5f6887528ccbeb to your computer and use it in GitHub Desktop.
Save zplume/f1a11d584a9d526dee5f6887528ccbeb to your computer and use it in GitHub Desktop.
using Microsoft.Azure.WebJobs;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace LS.DurableFunctions.Examples
{
public static class DurableOrchestrationContextExtensions
{
/// <summary>
/// Call activity functions in parallel with a maximum degree of parallelism (e.g. to prevent overloading other services).
/// Adapted from f2bo's code here: https://github.com/Azure/azure-functions-durable-extension/issues/596#issuecomment-459906400
/// </summary>
/// <typeparam name="TResult">The type returned from each instance of the activity function.</typeparam>
/// <typeparam name="TInput">The type of input provided to each instance of the activity function.</typeparam>
/// <param name="context">The DurableOrchestrationContext instance this method is attached to.</param>
/// <param name="functionName">The activity function name.</param>
/// <param name="inputs">The sequence of inputs to map to activity function instances.</param>
/// <param name="degreeOfParallelism">The maximum number of activities to execute in parallel.</param>
public async static Task<IEnumerable<TResult>> CallActivitiesAsync<TResult, TInput>(
this DurableOrchestrationContext context,
string functionName,
IEnumerable<TInput> inputs,
int degreeOfParallelism)
{
var runningActivities = new List<Task<TResult>>(inputs.Count());
foreach (var input in inputs)
{
var pendingOperations = runningActivities.Where(p => !p.IsCompleted);
if (pendingOperations.Count() >= degreeOfParallelism)
{
await Task.WhenAny(pendingOperations);
}
Task<TResult> result = context.CallActivityAsync<TResult>(functionName, input);
runningActivities.Add(result);
}
TResult[] results = await Task.WhenAll(runningActivities);
return results;
}
/// <summary>
/// Call activity functions in parallel with a maximum degree of parallelism (e.g. to prevent overloading other services).
/// Adapted from f2bo's code here: https://github.com/Azure/azure-functions-durable-extension/issues/596#issuecomment-459906400
/// </summary>
/// <typeparam name="TInput">The type of input provided to each instance of the activity function.</typeparam>
/// <param name="context">The DurableOrchestrationContext instance this method is attached to.</param>
/// <param name="functionName">The activity function name.</param>
/// <param name="inputs">The sequence of inputs to map to activity function instances.</param>
/// <param name="degreeOfParallelism">The maximum number of activities to execute in parallel.</param>
public async static Task CallActivitiesAsync<TInput>(
this DurableOrchestrationContext context,
string functionName,
IEnumerable<TInput> inputs,
int degreeOfParallelism)
{
var runningActivities = new List<Task>(inputs.Count());
foreach (var input in inputs)
{
var pendingOperations = runningActivities.Where(p => !p.IsCompleted);
if (pendingOperations.Count() >= degreeOfParallelism)
{
await Task.WhenAny(pendingOperations);
}
Task task = context.CallActivityAsync(functionName, input);
runningActivities.Add(task);
}
await Task.WhenAll(runningActivities);
}
/// <summary>
/// Call sub-orchestration functions in parallel with a maximum degree of parallelism (e.g. to prevent overloading other services).
/// Adapted from f2bo's code here: https://github.com/Azure/azure-functions-durable-extension/issues/596#issuecomment-459906400
/// </summary>
/// <typeparam name="TResult">The type returned from each instance of the sub-orchestrator function.</typeparam>
/// <typeparam name="TInput">The type of input provided to each instance of the sub-orchestrator function.</typeparam>
/// <param name="context">The DurableOrchestrationContext instance this method is attached to.</param>
/// <param name="functionName">The sub-orchestration function name.</param>
/// <param name="inputs">The sequence of inputs to map to sub-orchestration instances.</param>
/// <param name="degreeOfParallelism">The maximum number of sub-orchestrations to execute in parallel.</param>
public async static Task<IEnumerable<TResult>> CallSubOrchestratorsAsync<TResult, TInput>(
this DurableOrchestrationContext context,
string functionName,
IEnumerable<TInput> inputs,
int degreeOfParallelism)
{
var runningSubOrchestrations = new List<Task<TResult>>(inputs.Count());
foreach (var input in inputs)
{
var pendingOperations = runningSubOrchestrations.Where(p => !p.IsCompleted);
if (pendingOperations.Count() >= degreeOfParallelism)
{
await Task.WhenAny(pendingOperations);
}
Task<TResult> result = context.CallSubOrchestratorAsync<TResult>(functionName, input);
runningSubOrchestrations.Add(result);
}
TResult[] results = await Task.WhenAll(runningSubOrchestrations);
return results;
}
/// <summary>
/// Call sub-orchestration functions in parallel with a maximum degree of parallelism (e.g. to prevent overloading other services).
/// Adapted from f2bo's code here: https://github.com/Azure/azure-functions-durable-extension/issues/596#issuecomment-459906400
/// </summary>
/// <typeparam name="TInput">The type of input provided to each instance of the sub-orchestrator function.</typeparam>
/// <param name="context">The DurableOrchestrationContext instance this method is attached to.</param>
/// <param name="functionName">The sub-orchestration function name.</param>
/// <param name="inputs">The sequence of inputs to map to sub-orchestration instances.</param>
/// <param name="degreeOfParallelism">The maximum number of sub-orchestrations to execute in parallel.</param>
public async static Task CallSubOrchestratorsAsync<TInput>(
this DurableOrchestrationContext context,
string functionName,
IEnumerable<TInput> inputs,
int degreeOfParallelism)
{
var runningSubOrchestrations = new List<Task>(inputs.Count());
foreach (var input in inputs)
{
var pendingOperations = runningSubOrchestrations.Where(p => !p.IsCompleted);
if (pendingOperations.Count() >= degreeOfParallelism)
{
await Task.WhenAny(pendingOperations);
}
Task result = context.CallSubOrchestratorAsync(functionName, input);
runningSubOrchestrations.Add(result);
}
await Task.WhenAll(runningSubOrchestrations);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment