Skip to content

Instantly share code, notes, and snippets.

@BryanWilhite
Created December 22, 2019 06:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BryanWilhite/184501cfaa35cfd0a596535a4179c69f to your computer and use it in GitHub Desktop.
Save BryanWilhite/184501cfaa35cfd0a596535a4179c69f to your computer and use it in GitHub Desktop.
PlayerYouTubeDurableFunctions.cs
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Songhay.Diagnostics;
using Songhay.Extensions;
using Songhay.Models;
using Songhay.Player.Activities;
using Songhay.Player.Models;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
namespace Songhay.Player.Functions
{
public static class PlayerYouTubeDurableFunctions
{
static PlayerYouTubeDurableFunctions()
{
TraceSources.ConfiguredTraceSourceName = $"trace-{nameof(PlayerYouTubeDurableFunctions)}";
traceSource = TraceSources
.Instance
.GetConfiguredTraceSource()
.WithSourceLevels()
.EnsureTraceSource();
}
static readonly TraceSource traceSource;
[FunctionName(FUNC_NAME_ORCH_TRIGGER)]
public static async Task RunPlayerYouTubeAsync(
[TimerTrigger(NCRONTAB_ONCE_EVERY_SIX_HOURS)] TimerInfo timerInfo,
[OrchestrationClient] DurableOrchestrationClient client,
ILogger log)
{
log?.LogInformation($"{FUNC_NAME_ORCH_TRIGGER}: {nameof(RunPlayerYouTubeAsync)} invoked...");
string instanceId = await client.StartNewAsync(FUNC_NAME_ORCH, null);
log?.LogInformation($"{FUNC_NAME_ORCH_TRIGGER}: orchestration started [{nameof(instanceId)}: {instanceId}].");
}
[FunctionName(FUNC_NAME_ORCH)]
public static async Task<string> RunPlayerYouTubeOrchestration(
[OrchestrationTrigger] DurableOrchestrationContext orchestrationContext,
ILogger log)
{
log?.LogInformation($"{FUNC_NAME_ORCH}: {nameof(RunPlayerYouTubeOrchestration)} invoked...");
log?.LogInformation($"{FUNC_NAME_ORCH}: {nameof(DurableOrchestrationContextBase.InstanceId)} {orchestrationContext.InstanceId}");
log?.LogInformation($"{FUNC_NAME_ORCH}: calling {FUNC_NAME_ORCH_FUNC0}.c ..");
var jsonBlobs = await orchestrationContext
.CallActivityAsync<IEnumerable<string>>(FUNC_NAME_ORCH_FUNC0, null);
if ((jsonBlobs == null) || !jsonBlobs.Any())
{
var errorMessage = $"{FUNC_NAME_ORCH}: The expected JSON blobs from `{FUNC_NAME_ORCH_FUNC0}` are not here.";
log?.LogError(errorMessage);
throw new NullReferenceException(errorMessage);
}
if (!orchestrationContext.IsReplaying)
{
var tasks = jsonBlobs.Select(json =>
{
log?.LogInformation($"{FUNC_NAME_ORCH}: calling {FUNC_NAME_ORCH_FUNC1}...");
return orchestrationContext.CallActivityAsync(FUNC_NAME_ORCH_FUNC1, json);
});
log?.LogInformation($"{FUNC_NAME_ORCH}: fan-out starting...");
await Task.WhenAll(tasks);
}
log?.LogInformation($"{FUNC_NAME_ORCH}: fan-out complete.");
log?.LogInformation($"{FUNC_NAME_ORCH}: calling {FUNC_NAME_ORCH_FUNC2}...");
await orchestrationContext.CallActivityAsync(FUNC_NAME_ORCH_FUNC2, null);
return orchestrationContext.InstanceId;
}
[FunctionName(FUNC_NAME_ORCH_FUNC0)]
public static async Task<IEnumerable<string>> RunPlayerYouTubeChannelBlobsActivity(
[ActivityTrigger] DurableActivityContext context,
ExecutionContext executionContext,
ILogger log)
{
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC0}: {nameof(RunPlayerYouTubeUploadActivity)} invoked...");
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC0}: {nameof(DurableActivityContext.InstanceId)} {context.InstanceId}");
var configuration = GetConfiguration(executionContext);
var args = new[] { nameof(PlayerYouTubeChannelBlobsActivity) };
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC0}: getting `{args.First()}` {nameof(IActivity)}...");
var activity = GetActivity(args, log)
.WithConfiguration(configuration)
.ToActivityWithTask<ProgramArgs, IEnumerable<string>>();
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC0}: starting `{args.First()}` {nameof(IActivity)}...");
var activityOutput = await activity
.StartActivityAsync<ProgramArgs, IEnumerable<string>>(new ProgramArgs(args), traceSource);
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC0}: getting `{args.First()}` {nameof(IActivity)} log...");
log?.LogInformation(activityOutput?.Log ?? "[null]");
return activityOutput.Output;
}
[FunctionName(FUNC_NAME_ORCH_FUNC1)]
public static async Task RunPlayerYouTubeUploadActivity(
[ActivityTrigger] DurableActivityContext context,
ExecutionContext executionContext,
ILogger log)
{
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC1}: {nameof(RunPlayerYouTubeUploadActivity)} invoked...");
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC1}: {nameof(DurableActivityContext.InstanceId)} {context.InstanceId}");
var blobItem = context.GetInput<string>();
if (string.IsNullOrWhiteSpace(blobItem))
{
var errorMessage = $"{FUNC_NAME_ORCH_FUNC1}: The expected JSON blob from `{FUNC_NAME_ORCH_FUNC0}` is not here.";
log?.LogError(errorMessage);
throw new NullReferenceException(errorMessage);
}
var configuration = GetConfiguration(executionContext);
var args = new[] { nameof(PlayerYouTubeUploadActivity) };
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC1}: getting `{args.First()}` {nameof(IActivity)}...");
var activity = GetActivity(args, log)
.WithConfiguration(configuration)
.ToActivityWithTask<string>();
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC1}: starting `{args.First()}` {nameof(IActivity)}...");
var activityLog = string.Empty;
try
{
activityLog = await activity.StartActivityAsync<string>(blobItem, traceSource);
}
catch (Exception ex)
{
log?.LogError($"{FUNC_NAME_ORCH_FUNC1} ERROR: {ex.Message}");
log?.LogError(ex.StackTrace);
}
finally
{
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC1}: getting `{args.First()}` {nameof(IActivity)} log...");
log?.LogInformation(activityLog ?? "[null]");
}
}
[FunctionName(FUNC_NAME_ORCH_FUNC2)]
public static async Task RunPlayerYouTubeIndicesActivity(
[ActivityTrigger] DurableActivityContext context,
ExecutionContext executionContext,
ILogger log)
{
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC2}: {nameof(RunPlayerYouTubeIndicesActivity)} invoked...");
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC2}: {nameof(DurableActivityContext.InstanceId)} {context.InstanceId}");
var configuration = GetConfiguration(executionContext);
var args = new[] { nameof(PlayerYouTubeIndicesActivity) };
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC2}: getting `{args.First()}` {nameof(IActivity)}...");
var activity = GetActivity(args, log)
.WithConfiguration(configuration)
.ToActivityWithTask();
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC2}: starting `{args.First()}` {nameof(IActivity)}...");
var activityLog = await activity.StartActivityAsync(traceSource);
log?.LogInformation($"{FUNC_NAME_ORCH_FUNC2}: getting `{args.First()}` {nameof(IActivity)} log...");
log?.LogInformation(activityLog ?? "[null]");
}
internal static IActivity GetActivity(string[] args, ILogger log)
{
var getter = new PlayerActivitiesGetter(args);
var activity = getter.GetActivity();
if (activity == null)
{
var errorMessage = $"{FUNC_NAME_ORCH_FUNC1}: the expected Activity is not here [{nameof(args)}: `{string.Join(',', args)}`].";
log?.LogError(errorMessage);
throw new NullReferenceException(errorMessage);
}
return activity;
}
internal static IConfigurationRoot GetConfiguration(ExecutionContext executionContext)
{
if (executionContext == null)
throw new NullReferenceException($"The expected {nameof(ExecutionContext)} is not here.");
var configuration = ProgramUtility.LoadConfiguration(
executionContext.FunctionAppDirectory,
builder => builder.AddJsonFile($"./{AppScalars.ConventionalSettingsFile}"));
return configuration;
}
const string FUNC_NAME_ORCH_TRIGGER = "PlayerYouTubeOrchestrationTrigger";
const string FUNC_NAME_ORCH = "PlayerYouTubeOrchestration";
const string FUNC_NAME_ORCH_FUNC0 = "PlayerYouTubeChannelBlobsActivity";
const string FUNC_NAME_ORCH_FUNC1 = "PlayerYouTubeUploadsActivity";
const string FUNC_NAME_ORCH_FUNC2 = "PlayerYouTubeIndicesActivity";
/// <summary>
/// NCRONTAB expression
/// </summary>
/// <remarks>
/// 📚see https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-timer?tabs=csharp#ncrontab-expressions
/// </remarks>
const string NCRONTAB_ONCE_EVERY_SIX_HOURS = "0 0 */6 * * *";
/// <summary>
/// NCRONTAB expression for testing
/// </summary>
const string NCRONTAB_ONE_TIME_PER_DAY = "0 8 01 * * *";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment