Skip to content

Instantly share code, notes, and snippets.

@wmeints
Last active July 7, 2020 13:11
Show Gist options
  • Save wmeints/be00517e291622c42c07f5bcce583b79 to your computer and use it in GitHub Desktop.
Save wmeints/be00517e291622c42c07f5bcce583b79 to your computer and use it in GitHub Desktop.
A wrapper around the Gremlin.NET client to automatically log query costs and retry failed requests.
/// <summary>
/// Wrapper class for <see cref="IGremlinClient"/> that automatically retries send operations and logs
/// information about the CosmosDB dependency in Application Insights.
/// </summary>
public class GraphClient : IGraphClient
{
private readonly TelemetryClient _telemetryClient;
private readonly IGremlinClient _client;
private readonly IOptions<GremlinSettings> _gremlinSettings;
private readonly ILogger<GraphClient> _logger;
private readonly AsyncRetryPolicy _waitAndRetryAsync;
/// <summary>
/// Initializes a new instance of <see cref="GraphClient"/>
/// </summary>
/// <param name="telemetryClient">Telemetry client to use for logging dependency data</param>
/// <param name="client">Gremlin client to wrap</param>
/// <param name="gremlinSettings">Settings for the gremlin client</param>
/// <param name="logger">Logger to use for tracking usage data</param>
public GraphClient(TelemetryClient telemetryClient, IGremlinClient client,
IOptions<GremlinSettings> gremlinSettings, ILogger<GraphClient> logger)
{
_telemetryClient = telemetryClient;
_client = client;
_gremlinSettings = gremlinSettings;
_logger = logger;
_waitAndRetryAsync = Policy
.Handle<ResponseException>(error => error.CosmosDbStatusCode() == 429)
.WaitAndRetryAsync(
10,
(attempt, exception, _) => ((ResponseException)exception).CosmosDbRetryAfter(),
(exception, waitTime, attempt, _) =>
{
_logger.LogWarning("Attempt {Attempt}: Retrying operation after {WaitTime}", attempt, waitTime);
return Task.CompletedTask;
});
}
/// <summary>
/// Executes a graph query against the CosmosDB backend
/// </summary>
/// <param name="query">Query to execute against the graph</param>
/// <param name="parameterBindings">Parameter bindings for the query</param>
/// <typeparam name="T">Type of result to get</typeparam>
/// <returns>Returns the outcome of the query</returns>
public async Task<ResultSet<T>> ExecuteAsync<T>(string query,
Dictionary<string, object> parameterBindings = null)
{
return await _waitAndRetryAsync.ExecuteAsync(() => ExecuteAsyncInternal<T>(query, parameterBindings));
}
private async Task<ResultSet<T>> ExecuteAsyncInternal<T>(string query,
Dictionary<string, object> parameterBindings)
{
var stopwatch = new Stopwatch();
var startTime = DateTime.UtcNow;
var success = true;
ResultSet<T> results = null;
try
{
stopwatch.Start();
results = await _client.SubmitAsync<T>(query, parameterBindings);
}
catch (NullReferenceException)
{
// Yes, sometimes this thing raises null reference errors when we don't get any content back
// from CosmosDB. This is in fact okay, because some operations don't return results.
// For example, if you get an error, CosmosDB doesn't always send a normal reply.
}
catch (Exception ex)
{
success = false;
_logger.LogError(ex, "Failed to execute query");
throw;
}
finally
{
stopwatch.Stop();
var dependency = new DependencyTelemetry(
"CosmosDB",
_gremlinSettings.Value.HostName,
_gremlinSettings.Value.UserName,
query);
dependency.Duration = stopwatch.Elapsed;
dependency.Timestamp = startTime;
dependency.Success = success;
if (results != null)
{
foreach (var attribute in results.StatusAttributes)
{
dependency.Properties[attribute.Key] = attribute.Value.ToString();
}
}
_telemetryClient.TrackDependency(dependency);
}
return results;
}
}
/// <summary>
/// Use these extension methods to provide a static connection to the Gremlin API of Cosmos DB.
/// The static connection ensures that you don't create a new connection for every query as this
/// will cause significant performance problems in your application.
/// </summary>
public static class GremlinExtensions
{
/// <summary>
/// Adds a gremlin client to the dependencies collection of the application.
/// </summary>
/// <param name="services">Service collection to extend.</param>
public static void AddGremlin(this IServiceCollection services)
{
services
.AddOptions<GremlinSettings>()
.Configure<IConfiguration>((settings, configuration) =>
{
configuration.GetSection("Gremlin").Bind(settings);
});
services.AddSingleton(serviceProvider =>
{
var settings = serviceProvider.GetRequiredService<IOptions<GremlinSettings>>().Value;
var server = new GremlinServer(
hostname: settings.HostName,
port: settings.Port,
enableSsl: settings.EnableSsl,
username: settings.UserName,
password: settings.AccessKey);
return server;
});
services.AddSingleton<IGremlinClient>(serviceProvider => new GremlinClient(
gremlinServer: serviceProvider.GetRequiredService<GremlinServer>(),
graphSONReader: new GraphSON2Reader(),
graphSONWriter: new GraphSON2Writer(),
mimeType: GremlinClient.GraphSON2MimeType));
services.AddSingleton<IGraphClient, GraphClient>();
}
}
/// <summary>
/// Settings for the gremlin API connection to use.
/// </summary>
public class GremlinSettings
{
/// <summary>
/// Gets or sets the hostname of the Gremlin API to connect to.
/// </summary>
public string HostName { get; set; }
/// <summary>
/// Gets the port to connect on (default 8182).
/// </summary>
public int Port { get; set; }
/// <summary>
/// Gets or sets the username to use.
/// </summary>
public string UserName { get; set; }
/// <summary>
/// Gets or sets the access key to use.
/// </summary>
public string AccessKey { get; set; }
/// <summary>
/// Gets or sets whether SSL should be used for the connection (true) or not (false).
/// </summary>
public bool EnableSsl{ get; set; }
}
/// <summary>
/// Extension methods to easily extract information from <see cref="ResponseException"/>.
/// </summary>
public static class ResponseExceptionExtensions
{
/// <summary>
/// Parses the status code from a CosmosDB response exception
/// </summary>
/// <param name="source">The source exception to parse</param>
/// <returns>The status code from the exception</returns>
public static int CosmosDbStatusCode(this ResponseException source)
{
if (source == null)
{
return 500;
}
if (!source.StatusAttributes.TryGetValue("x-ms-status-code", out var code))
{
throw new InvalidOperationException("Header 'x-ms-status-code' is not presented.");
}
return Int32.Parse(code.ToString());
}
/// <summary>
/// Parses the retry header from a CosmosDB response exception.
/// </summary>
/// <param name="source">The source exception to parse</param>
/// <returns>The timeout in ms retrieved from the response exception if present; Otherwise 200ms</returns>
public static TimeSpan CosmosDbRetryAfter(this ResponseException source)
{
// If no retry header is presented, we perform a fallback to a sane default.
// This should prevent the app from spinning in an endless exception loop.
if (!source.StatusAttributes.TryGetValue("x-ms-retry-after-ms", out var time))
{
return TimeSpan.FromMilliseconds(200);
}
return TimeSpan.Parse(time.ToString());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment