Skip to content

Instantly share code, notes, and snippets.

@bryanknox
Created March 23, 2020 19:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bryanknox/8a0db9305c11e0fe9a1195c08011ddd6 to your computer and use it in GitHub Desktop.
Save bryanknox/8a0db9305c11e0fe9a1195c08011ddd6 to your computer and use it in GitHub Desktop.
Use paging to process status of many Azure Durable Functions.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
namespace K0x.DurableFunctionsHelpers
{
public static class DurableOrchestrationStatusProcessor
{
/// <summary>
/// Processes DurableOrchestrationStatus in PageSize batches by iterating through
/// the following steps:
/// 1) Find DurableOrchestrationStatus in the given client context based on the
/// given condition.
/// 2) Filter the statuses found sing the given filter delegate.
/// 3) Process the filtered statuses using the given processor delegate.
/// </summary>
/// <param name="client">
/// The IDurableOrchestrationClient context to use to call the GetStatusAsync(..) method
/// to perform the find.
/// </param>
/// <param name="condition">
/// The query condition for the find. It is passed into the the GetStatusAsync(..) method
/// of the given client. Paging is used. Specify the PageSize in the condidtion
/// to control how many statuses are returned in each iteration of the status query.
/// Larger numbers require more memory and more data sent across the network. Smaller
/// numbers require more calls to the GetStatusAsync(..), filter and process methods
/// (more iterations).
/// </param>
/// <param name="filter">
/// A delagate for filtering the DurableOrchestrationStatus returned from each iteration
/// of the find. It accepts an IEnumerable<DurableOrchestrationStatus> and returns
/// the filtered results as an IEnumerable<DurableOrchestrationStatus>.
/// </param>
/// <param name="process">
/// A delagate that processes the filtered DurableOrchestrationStatus.
/// It accepts an IEnumerable<DurableOrchestrationStatus>.
/// </param>
/// <param name="cancellationToken">
/// The cancellation token that can be used to cancel the status find operation
/// and iterations.
/// </param>
public static async Task ProcessPagedStatusAsync(
IDurableOrchestrationClient client,
OrchestrationStatusQueryCondition condition,
Func<IEnumerable<DurableOrchestrationStatus>, IEnumerable<DurableOrchestrationStatus>> filter,
Action<IEnumerable<DurableOrchestrationStatus>> process,
CancellationToken cancellationToken)
{
string continuationToken = null;
do
{
condition.ContinuationToken = continuationToken;
OrchestrationStatusQueryResult queryResult = await client.GetStatusAsync(
condition,
cancellationToken);
IEnumerable<DurableOrchestrationStatus> foundInPage = filter(
queryResult.DurableOrchestrationState);
process(foundInPage);
continuationToken = queryResult.ContinuationToken;
} while (
continuationToken != "bnVsbA=="
&& !cancellationToken.IsCancellationRequested);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment