Skip to content

Instantly share code, notes, and snippets.

@bogdangaliceanu
Last active August 26, 2022 15:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bogdangaliceanu/d3e8b96c8aab3b4e1bb299b52daf788e to your computer and use it in GitHub Desktop.
Save bogdangaliceanu/d3e8b96c8aab3b4e1bb299b52daf788e to your computer and use it in GitHub Desktop.
Amazon Neptune autoscaling
public sealed class DynamicNeptuneGraphDb : IDisposable
{
private static readonly ILogger Logger = LogManager.GetCurrentClassLogger();
private readonly NeptuneConfiguration configuration;
private readonly IGremlinStatusProvider gremlinStatusProvider;
private readonly IGremlinClientFactory clientFactory;
private readonly INeptuneRefreshScheduler refreshScheduler;
private readonly INeptuneInstanceDescriber neptuneInstanceDescriber;
private readonly ManualResetEventSlim refreshLock = new(false, 0);
// the client that connects to the only writer instance in the cluster
private (string InstanceId, IGremlinClient Client) writeClient;
// the clients whose instances are ready to process queries
private readonly List<(DBInstance Instance, IGremlinClient Client)> availableReadClients = new();
// the clients whose instances are no longer able to accept new queries (because their status is no longer 'available') but still exist and are still processing queries
private readonly List<(DBInstance Instance, IGremlinClient Client)> unavailableReadClients = new();
// for each type of query, the load situation of each instance
private ConcurrentDictionary<string, ImmutableArray<InstanceLoadInfo>> instanceLoadInfoByQueryDescriptor;
private const string queryDescriptorPrefix = "//queryDescriptor:";
public DynamicNeptuneGraphDb(IOptions<NeptuneConfiguration> neptuneConfiguration, IGremlinStatusProvider gremlinStatusProvider,
IGremlinClientFactory clientFactory, INeptuneRefreshScheduler refreshScheduler, INeptuneInstanceDescriber neptuneInstanceDescriber)
{
this.gremlinStatusProvider = gremlinStatusProvider;
this.clientFactory = clientFactory;
this.refreshScheduler = refreshScheduler;
this.neptuneInstanceDescriber = neptuneInstanceDescriber;
this.configuration = neptuneConfiguration.Value;
this.refreshScheduler.Elapsed += async (_, _) =>
{
try
{
await Refresh();
}
catch (Exception e)
{
Logger.Error(e);
}
};
}
public async Task Init()
{
await Refresh();
refreshScheduler.Start();
}
public void Dispose()
{
refreshScheduler.Dispose();
neptuneInstanceDescriber.Dispose();
writeClient.Client?.Dispose();
availableReadClients.ForEach(c => c.Client.Dispose());
refreshLock.Dispose();
gremlinStatusProvider.Dispose();
}
public GraphTraversalSource TraversalSource
{
get
{
refreshLock.Wait();
var connection = new DriverRemoteConnection(writeClient.Client);
return Traversal().WithRemote(connection);
}
}
public async Task<ResultSet<ResponseType>> ReadAsync<ResponseType>(string queryDescriptor, string requestScript)
{
var readClient = GetReadClient(queryDescriptor);
requestScript = $"{queryDescriptorPrefix}{queryDescriptor}\n{requestScript}";
try
{
return await readClient.SubmitAsync<ResponseType>(requestScript);
}
catch (Exception e) when (e is WebSocketException or ObjectDisposedException or ServerUnavailableException)
// WebSocketException and ServerUnavailableException if the instance becomes unavailable between client refreshes
// ObjectDisposedException if a client refresh occurs between obtaining the client and submitting the query, the instance is unavailable and thus the client is disposed
{
await Refresh();
readClient = GetReadClient(queryDescriptor);
return await readClient.SubmitAsync<ResponseType>(requestScript);
}
}
private IGremlinClient GetReadClient(string queryDescriptor)
{
refreshLock.Wait();
if (availableReadClients.IsEmpty())
{
return writeClient.Client;
}
var loadInfo = instanceLoadInfoByQueryDescriptor.GetOrAdd(
queryDescriptor,
_ => availableReadClients
.Select((_, i) => new InstanceLoadInfo {InstanceIndex = i, CurrentLoad = 0})
.ToImmutableArray()
);
var leastUtilizedInstanceInfo = loadInfo.MinBy(x => x.CurrentLoad);
Interlocked.Increment(ref leastUtilizedInstanceInfo!.CurrentLoad);
return availableReadClients[leastUtilizedInstanceInfo.InstanceIndex].Client;
}
private async Task Refresh()
{
refreshLock.Reset();
try
{
var statuses = await RefreshClients();
RefreshLoad(statuses);
}
finally
{
refreshLock.Set();
}
}
private async Task<ImmutableArray<GremlinStatus>> RefreshClients()
{
var (writer, readers) = await neptuneInstanceDescriber.GetInstances(configuration.Cluster);
var instancesDiff = DetermineInstancesDiff(writer, readers);
if (instancesDiff.ChangedWriterInstance != null)
{
writeClient.Client?.Dispose();
writeClient = (
instancesDiff.ChangedWriterInstance.DBInstanceIdentifier,
clientFactory.CreateClient(instancesDiff.ChangedWriterInstance.Endpoint.Address, instancesDiff.ChangedWriterInstance.Endpoint.Port,
configuration.PoolSize, configuration.MaxInProcessPerConnection)
);
}
availableReadClients.RemoveAll(c =>
{
if (instancesDiff.NewlyUnavailableReaderInstances.Any(i => i.DBInstanceIdentifier == c.Instance.DBInstanceIdentifier))
{
unavailableReadClients.Add(c);
return true;
}
return false;
});
unavailableReadClients.RemoveAll(c =>
{
if (instancesDiff.ObsoleteUnavailableInstanceIds.Contains(c.Instance.DBInstanceIdentifier))
{
c.Client.Dispose();
return true;
}
return false;
});
availableReadClients.AddRange(instancesDiff.NewlyAvailableReaderInstances.Select(i => (
i,
clientFactory.CreateClient(i.Endpoint.Address, i.Endpoint.Port, configuration.PoolSize, configuration.MaxInProcessPerConnection)
)));
var statuses = await GetAvailableReadersStatuses();
var (reachable, unreachable) = statuses.Partition(s => s.Status != null);
availableReadClients.RemoveAll(c =>
{
if (unreachable.Any(s => s.InstanceId == c.Instance.DBInstanceIdentifier))
{
c.Client.Dispose();
return true;
}
return false;
});
return reachable.Select(s => s.Status).ToImmutableArray();
}
private void RefreshLoad(ImmutableArray<GremlinStatus> statuses)
{
var allQueryDescriptors = statuses
.SelectMany(s => s.Queries)
.Select(q => ExtractQueryDescriptor(q.QueryString))
.Where(qd => !string.IsNullOrEmpty(qd))
.Distinct()
.ToImmutableArray();
instanceLoadInfoByQueryDescriptor = new ConcurrentDictionary<string, ImmutableArray<InstanceLoadInfo>>(
allQueryDescriptors
.SelectMany(qd => statuses
.Select((status, i) =>
{
var queriesWithDescriptorCount = status.Queries.Count(q => ExtractQueryDescriptor(q.QueryString) == qd);
return (Descriptor: qd, LoadInfo: new InstanceLoadInfo { CurrentLoad = queriesWithDescriptorCount, InstanceIndex = i });
})
)
.GroupBy(x => x.Descriptor)
.Select(g => KeyValuePair.Create(g.Key, g.Select(x => x.LoadInfo).ToImmutableArray()))
);
}
private async Task<(string InstanceId, GremlinStatus Status)[]> GetAvailableReadersStatuses()
{
var statusRetrievals = availableReadClients
.Select(async c =>
{
try
{
return (c.Instance.DBInstanceIdentifier, await gremlinStatusProvider.Get(c.Instance.Endpoint.Address, c.Instance.Endpoint.Port));
}
catch (Exception e)
{
Logger.Error(e);
return (c.Instance.DBInstanceIdentifier, null);
}
});
return await Task.WhenAll(statusRetrievals);
}
private static string ExtractQueryDescriptor(string query)
{
if (query is null || !query.StartsWith(queryDescriptorPrefix, StringComparison.Ordinal))
{
return string.Empty;
}
return query.Substring(queryDescriptorPrefix.Length, query.IndexOf('\n') - queryDescriptorPrefix.Length);
}
private InstancesDiff DetermineInstancesDiff(DBInstance currentWriteInstance, ImmutableArray<DBInstance> currentReadInstances)
{
var newlyAvailableReaderInstances = currentReadInstances
.Where(i =>
{
if (i.DBInstanceStatus != "available")
{
return false;
}
var isNotAlreadyKnown = availableReadClients.All(c => c.Instance.DBInstanceIdentifier != i.DBInstanceIdentifier);
return isNotAlreadyKnown;
})
.ToImmutableArray();
var newlyUnavailableReaderInstances = currentReadInstances
.Where(i =>
{
if (i.DBInstanceStatus == "available")
{
return false;
}
var usedToBeAvailable = availableReadClients.Any(c => c.Instance.DBInstanceIdentifier == i.DBInstanceIdentifier);
var isNotAlreadyKnown = unavailableReadClients.All(c => c.Instance.DBInstanceIdentifier != i.DBInstanceIdentifier);
return usedToBeAvailable && isNotAlreadyKnown;
})
.ToImmutableArray();
var obsoleteUnavailableReaderInstanceIds = unavailableReadClients
.Select(c => c.Instance.DBInstanceIdentifier)
.Where(id =>
{
var doesNotExistAnymore = currentReadInstances.All(i => i.DBInstanceIdentifier != id);
var becameAvailableAgain = newlyAvailableReaderInstances.Any(i => i.DBInstanceIdentifier == id);
return doesNotExistAnymore || becameAvailableAgain;
})
.ToImmutableArray();
var changedWriterInstance = currentWriteInstance.DBInstanceIdentifier == writeClient.InstanceId
? null
: currentWriteInstance;
return new InstancesDiff
{
ChangedWriterInstance = changedWriterInstance,
NewlyAvailableReaderInstances = newlyAvailableReaderInstances,
NewlyUnavailableReaderInstances = newlyUnavailableReaderInstances,
ObsoleteUnavailableInstanceIds = obsoleteUnavailableReaderInstanceIds,
};
}
private class InstancesDiff
{
public DBInstance ChangedWriterInstance { get; init; }
public ImmutableArray<DBInstance> NewlyAvailableReaderInstances { get; init; }
public ImmutableArray<DBInstance> NewlyUnavailableReaderInstances { get; init; }
public ImmutableArray<string> ObsoleteUnavailableInstanceIds { get; init; }
}
private class InstanceLoadInfo
{
public int InstanceIndex;
public int CurrentLoad;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment