Skip to content

Instantly share code, notes, and snippets.

@pksorensen
Created December 28, 2020 08:28
Show Gist options
  • Save pksorensen/e40764ec6796ead38246f0a187f0d044 to your computer and use it in GitHub Desktop.
Save pksorensen/e40764ec6796ead38246f0a187f0d044 to your computer and use it in GitHub Desktop.
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Identity.Client;
using Microsoft.PowerPlatform.Cds.Client;
using Microsoft.PowerPlatform.Cds.Client.Utils;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;
using Microsoft.Xrm.Sdk.Query;
using Newtonsoft.Json.Linq;
using Polly;
using Polly.Retry;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Sockets;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Text;
using System.Threading.Tasks;
using System.Xml.Linq;
namespace DotNetDevOps.Extensions.PowerPlatform.DataVerse
{
public class TokenService
{
private IConfiguration configuration;
private readonly IHttpClientFactory httpClientFactory;
private IConfidentialClientApplication app = null;
public TokenService(IConfiguration configuration, IHttpClientFactory httpClientFactory)
{
this.configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
this.httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
}
public async Task<string> GetTokenAsync(string arg)
{
if (app == default)
{
var rsp = await httpClientFactory.CreateClient().GetAsync(
$"{new Uri(configuration.GetValue<string>("CDSEnvironment")).GetLeftPart(UriPartial.Authority)}/api/data/v9.1/accounts");
var auth = rsp.Headers.GetValues("www-authenticate").FirstOrDefault();
var tenant = auth.Substring("Bearer ".Length).Split(',')
.Select(k => k.Trim().Split('='))
.ToDictionary(k => k[0], v => v[1]);
var tenantId = new Uri(tenant["authorization_uri"]).AbsolutePath
.Split('/', StringSplitOptions.RemoveEmptyEntries).FirstOrDefault();
app = ConfidentialClientApplicationBuilder.Create(configuration.GetValue<string>("CDSClientId"))
.WithTenantId(tenantId)
.WithClientSecret(configuration.GetValue<string>("CDSClientSecret"))
.Build();
}
var token = await app.AcquireTokenForClient(new[]
{
new Uri(configuration.GetValue<string>("CDSEnvironment")).GetLeftPart(UriPartial.Authority)
.TrimEnd('/') + "//.default"
})
.ExecuteAsync();
return token.AccessToken;
}
}
public class CDSPolly
{
public static RetryPolicy RetryPolicy = Policy
.Handle<CdsClientConnectionException>(ShouldHandle)
.Or<FaultException<OrganizationServiceFault>>(ShouldHandle)
.OrInner<TimeoutException>()
.OrInner<SocketException>()
.WaitAndRetryForever(BackoffTimeProvider, (ex, count, time, context) =>
{
LogRetryAttempt(ex, count, time, context);
});
private static bool ShouldHandle(FaultException<OrganizationServiceFault> arg)
{
if (arg.Detail.ErrorCode == -2147088227)
{
Console.WriteLine(arg.Detail.Message);
Console.WriteLine(arg.Detail.GetType().Name);
}
switch (arg.Detail.ErrorCode)
{
case -2147088227: //Name: MultipleRecordsFoundByEntityKey
//Message: More than one record exists for entity {0} with entity key involving attributes { 1} // concurrent transaction issue,More than one record exists for entity {} with entity key involving attributes {}
case -2147204295: //CouldNotObtainLockOnResource
//Database resource lock could not be obtained.For more information, see http://docs.microsoft.com/dynamics365/customer-engagement/customize/best-practices-workflow-processes#limit-the-number-of-workflows-that-update-the-same-entity
return true;
}
return false;
}
private static void LogRetryAttempt(Exception ex, int count, TimeSpan time, Context context)
{
if (context.ContainsKey("logger") && context["logger"] is ILogger logger)
{
if (count > 1)
logger.LogInformation("Retrying attempt:{count} with backoff time:{time}", count, time);
}
// Console.WriteLine($"Retrying attempt:{count} with backoff time:{time}");
}
private static TimeSpan BackoffTimeProvider(int i, Exception ex, Context context)
{
if (ex.InnerException is AggregateException aggreex
&& aggreex.InnerException is FaultException<OrganizationServiceFault> serviceex)
{
switch (serviceex.Detail.ErrorCode)
{
case -2147015902: //Number of requests exceeded the limit of 6000 over time window of 300 seconds.
case -2147015903: //Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
case -2147015898: //Number of concurrent requests exceeded the limit of 52.
return (TimeSpan)serviceex.Detail.ErrorDetails["Retry-After"];
}
}
return TimeSpan.FromSeconds(Math.Pow(2, i));
}
private static bool ShouldHandle(CdsClientConnectionException ex)
{
if (ex.InnerException is AggregateException aggreex)
{
if (aggreex.InnerException is FaultException<OrganizationServiceFault> serviceex)
{
switch (serviceex.Detail.ErrorCode)
{
case -2147015902: //Number of requests exceeded the limit of 6000 over time window of 300 seconds.
case -2147015903: //Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
case -2147015898: //Number of concurrent requests exceeded the limit of 52.
return true;
}
Console.WriteLine($"FaultException<OrganizationServiceFault>\n{serviceex.Message}\n{serviceex.Detail.ErrorCode}");
Console.WriteLine(string.Join("\n", serviceex.Detail.ErrorDetails.Select(kv => $"{kv.Key}={kv.Value}")));
}
else if (aggreex.InnerException is CommunicationException comex)
{
Console.WriteLine("Failed to communicate"); //Network issue
return true;
}
else if (aggreex.InnerException is TimeoutException timeout)
{
Console.WriteLine("Failed to communicate: Timeout"); //Network issue
return true;
}
}
return false;
}
}
public static class EntitySerializationHelper
{
public static string Serialize(this Entity entity)
{
var lateBoundSerializer = new DataContractSerializer(typeof(Entity));
var ms = new MemoryStream();
lateBoundSerializer.WriteObject(ms, entity);
var str = Encoding.UTF8.GetString(ms.ToArray());
return str;
}
public static Entity DeserializeToCrmEntity( string str)
{
var lateBoundSerializer = new DataContractSerializer(typeof(Entity));
var arr = Encoding.UTF8.GetBytes(str);
var ms2 = new MemoryStream(arr);
var entity2 = lateBoundSerializer.ReadObject(ms2) as Entity;
return entity2;
}
}
public static class DynamicsIServiceCollectionExtensions
{
public static EntityReference ToEntityReferenceWithKeys(this Entity entity)
{
if (entity.KeyAttributes.Any())
{
return new EntityReference(entity.LogicalName, entity.KeyAttributes);
}
return entity.ToEntityReference();
}
public static T Retrieve<T>(this EntityReference reference, IOrganizationService service, ColumnSet columnSet) where T : Entity
{
try
{
var resp = service.Execute<RetrieveResponse>(new RetrieveRequest
{
Target = reference,
ColumnSet = columnSet
});
return resp.Entity.ToEntity<T>();
}
catch (FaultException<OrganizationServiceFault> ex)
{
if (ex.Detail.ErrorCode == -2147220969)//
{
return null;
}
throw;
}
}
public static T Execute<T>(this IOrganizationService client, OrganizationRequest message)
where T : OrganizationResponse
{
return client.Execute(message) as T;
}
public static UpsertResponse Upsert(this IOrganizationService service, Entity entity, ILogger log)
{
var req = new UpsertRequest() { Target = entity };
var resp = service.Execute(req) as UpsertResponse;
entity.Id = resp.Target?.Id ?? entity.Id;
log.LogInformation("{EntityLogicalName}|{Id} was {CreatedOrUpdated}", entity.LogicalName, entity.Id, resp.RecordCreated ? "Created" : "Updated");
return resp;
}
private static ConcurrentQueue<PooledOrganizaitionService> queue = new ConcurrentQueue<PooledOrganizaitionService>();
public static IServiceCollection AddPowerPlatform(this IServiceCollection services, bool cache = false)
{
services.AddSingleton<TokenService>();
services.AddScoped((sp) =>
{
if (queue.Any() && queue.TryDequeue(out var result))
{
result.Logger = sp.GetRequiredService<ILogger<CdsServiceClient>>(); //Would be cool to get the loger for the function scope
return result;
}
var configuration = sp.GetRequiredService<IConfiguration>();
var uri =
new Uri(configuration.GetValue<string>("CDSEnvironment"));
CdsServiceClient.MaxConnectionTimeout = TimeSpan.FromMinutes(5);
CdsServiceClient service = CDSPolly.RetryPolicy.Execute((context) => new CdsServiceClient(uri,
sp.GetRequiredService<TokenService>().GetTokenAsync)
{ }
, new Context
{
["logger"] = sp.GetRequiredService<ILogger<CdsServiceClient>>(),
});
service.DisableCrossThreadSafeties = true;
//if (cache)
// return new PooledOrganizaitonService( new CachingOrganizationService(sp.GetRequiredService<ILogger<CachingOrganizationService>>(), service, Microsoft.Azure.Cosmos.Table.CloudStorageAccount.Parse(configuration.GetValue<string>("CDSClientCacheStorageAccount"))) as IOrganizationService,queue);
return new PooledOrganizaitionService(service, queue)
{
Logger = sp.GetRequiredService<ILogger<CdsServiceClient>>()
};
});
services.AddScoped(sp =>
{
return sp.GetRequiredService<PooledOrganizaitionService>() as IOrganizationService;
});
return services;
}
}
public class PooledOrganizaitionService : IDisposable, IOrganizationService
{
public bool UseWebApi { get; set; } = false;
private readonly CdsServiceClient service;
private readonly ConcurrentQueue<PooledOrganizaitionService> queue;
public ILogger Logger { get; set; }
public PooledOrganizaitionService(CdsServiceClient service, ConcurrentQueue<PooledOrganizaitionService> queue)
{
this.service = service ?? throw new ArgumentNullException(nameof(service));
this.queue = queue ?? throw new ArgumentNullException(nameof(queue));
}
public void Associate(string entityName, Guid entityId, Relationship relationship, EntityReferenceCollection relatedEntities)
{
this.service.Associate(entityName, entityId, relationship, relatedEntities);
}
public Guid Create(Entity entity)
{
using (Logger.BeginScope(new Dictionary<string, string> { { "CreateOperationId", Guid.NewGuid().ToString() } }))
{
Logger.LogInformation("Creating Entity<{EntityLogicalName}>", entity.LogicalName);
var createMessage = new CreateRequest
{
Target = entity,
};
var createResponse = Execute<CreateResponse>(new CreateRequest
{
Target = entity
});
entity.Id = createResponse.id;
Logger.LogInformation("Created Entity<{EntityLogicalName}> with {Id}", entity.LogicalName, entity.Id);
return entity.Id;
}
}
public void Delete(string entityName, Guid id)
{
var response = Execute<DeleteResponse>(new DeleteRequest
{
Target = new EntityReference(entityName, id)
});
}
public void Disassociate(string entityName, Guid entityId, Relationship relationship, EntityReferenceCollection relatedEntities)
{
service.Disassociate(entityName, entityId, relationship, relatedEntities);
}
public void Dispose()
{
queue.Enqueue(this);
}
object lockobj = new object();
public OrganizationResponse Execute(OrganizationRequest request)
{
lock (lockobj)
{
using (Logger.BeginScope(new Dictionary<string, string> { { "ExecuteOperationId", Guid.NewGuid().ToString() } }))
{
if (request is ExecuteMultipleRequest multipleRequest)
{
Logger.LogInformation("Executing Operation<{RequestName}> with {RequestCount}", request.RequestName, multipleRequest.Requests.Count);
}
else
{
Logger.LogInformation("Executing Operation<{RequestName}>", request.RequestName);
}
var response = service.ExecuteCdsOrganizationRequest(request, useWebAPI: UseWebApi);
if (response == null)
{
Logger.LogInformation(service.LastCdsException, "Execution Failed for Operation<{RequestName}>", request.RequestName);
throw service.LastCdsException;
}
Logger.LogInformation("Executed Operation<{RequestName}>", request.RequestName);
return response;
}
}
}
public T Execute<T>(OrganizationRequest request) where T : OrganizationResponse
{
return Execute(request) as T;
}
public Entity Retrieve(string entityName, Guid id, ColumnSet columnSet)
{
var response = Execute<RetrieveResponse>(new RetrieveRequest
{
ColumnSet = columnSet,
Target = new EntityReference(entityName, id)
});
return response.Entity;
}
public EntityCollection RetrieveMultiple(QueryBase query)
{
var a = Execute<RetrieveMultipleResponse>(new RetrieveMultipleRequest
{
Query = query
});
return a.EntityCollection;
}
public void Update(Entity entity)
{
var response = Execute<UpdateResponse>(new UpdateRequest
{
Target = entity
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment