Skip to content

Instantly share code, notes, and snippets.

@xximjasonxx
Created February 16, 2021 15:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xximjasonxx/fb0b7aa216382582ca8e822cda5286df to your computer and use it in GitHub Desktop.
Save xximjasonxx/fb0b7aa216382582ca8e822cda5286df to your computer and use it in GitHub Desktop.
[FunctionName("Orchestrate_Indexing")]
public async Task OrchestrateIndexing(
[OrchestrationTrigger] IDurableOrchestrationContext context,
[Queue(Constants.QUEUE_NOTIFYFOLLOWERS, Connection = Constants.TABLE_STORAGE_CONNECTION)] ICollector<string> eventCollector,
[Queue("durable-function-task-end", Connection = Constants.TABLE_STORAGE_CONNECTION)] ICollector<string> cleanupCollector,
ILogger logger)
{
var channelId = context.GetInput<IndexChannelInput>().ChannelId;
try
{
var phaseOneTasks = new List<Task<DateTime?>>
{
context.CallActivityAsync<DateTime?>("GetTop10Last30Days", channelId),
context.CallActivityAsync<DateTime?>("GetTop25AllTime", channelId)
};
// wait till the channels syncs complete
await Task.WhenAll(phaseOneTasks.ToArray());
phaseOneTasks.RaiseAggregateExceptionForFaults();
// get latest clip value
var latestClip = phaseOneTasks
.Select(x => x.Result)
.Where(x => x.HasValue)
.OrderByDescending(x => x.Value)
.FirstOrDefault();
// is this really part of indexing?
await context.CallActivityAsync("LoadUserProfile",
new LoadUserProfileInput() {ChannelId = channelId, LatestClipStamp = latestClip});
// complete everything - if we have no latest clip value, nothing changed
if (latestClip.HasValue)
{
var msg = $"{channelId}_{latestClip.Value}";
eventCollector.Add(msg);
}
}
catch (AggregateException agex)
{
// one or more tasks failed
foreach (var inEx in agex.InnerExceptions)
{
logger.LogError(new EventId(), inEx, "Orchestration of Start Indexing failed due to exceptions");
}
}
catch (Exception ex)
{
logger.LogError(new EventId(), ex, "Orchestration of Start Indexing failed due to non-activity exception");
}
finally
{
cleanupCollector.Add($"{channelId}-index");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment