Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
PropertyManager-based Cluster Membership provider for Service Fabric integration in Microsoft Orleans
namespace Microsoft.Orleans.ServiceFabric
{
using System;
using System.Collections.Generic;
using System.Fabric;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using global::Orleans;
using global::Orleans.Messaging;
using global::Orleans.Runtime;
using global::Orleans.Runtime.Configuration;
using global::Orleans.Serialization;
using Newtonsoft.Json;
/// <summary>
/// Cluster membership provider which uses Service Fabric's Property Manager as a consistent store.
/// </summary>
public class ServiceFabricPropertyManagerMembershipProvider : IMembershipTable, IGatewayListProvider
{
private const string VersionPropertyName = "VERSION";
private const string ETagPropertyName = "ETAG";
private const string DefaultETag = "0";
private readonly FabricClient fabricClient;
private readonly JsonSerializerSettings serializerSettings;
private Logger log;
private FabricClient.PropertyManagementClient store;
private Uri tableUri;
/// <summary>
/// Initializes a new <see name="ServiceFabricNamingServiceGatewayProvider"/> instance.
/// </summary>
/// <param name="client">The Service Fabric client to use.</param>
public ServiceFabricPropertyManagerMembershipProvider(FabricClient client)
{
this.fabricClient = client;
this.serializerSettings = OrleansJsonSerializer.GetDefaultSerializerSettings();
}
/// <summary>
/// Initializes the provider, will be called before all other methods
/// </summary>
/// <param name="config">The given client configuration</param>
/// <param name="logger">The logger to be used by the provider</param>
public Task InitializeGatewayListProvider(ClientConfiguration config, Logger logger)
{
this.Initialize(logger, config.DeploymentId);
this.MaxStaleness = config.GatewayListRefreshPeriod;
return Task.FromResult(0);
}
/// <summary>
/// Initializes the membership table, will be called before all other methods
/// </summary>
/// <param name="config">the give global configuration</param>
/// <param name="tryInitTableVersion">whether an attempt will be made to init the underlying table</param>
/// <param name="logger">the logger used by the membership table</param>
public async Task InitializeMembershipTable(GlobalConfiguration config, bool tryInitTableVersion, Logger logger)
{
this.Initialize(logger, config.DeploymentId);
this.log = logger;
if (tryInitTableVersion)
{
try
{
await this.store.CreateNameAsync(this.tableUri);
}
catch (FabricElementAlreadyExistsException)
{
this.log?.Verbose($"Membership table already exists in property store at {this.tableUri}");
}
var ops = new PropertyBatchOperation[]
{
// Check preconditions.
new CheckExistsPropertyOperation(ETagPropertyName, false),
// Update version and insert rows.
new PutPropertyOperation(VersionPropertyName, 0),
new PutPropertyOperation(ETagPropertyName, DefaultETag)
};
await this.store.SubmitPropertyBatchAsync(this.tableUri, ops);
}
}
/// <summary>
/// Atomically reads the Membership Table information about a given silo.
/// The returned MembershipTableData includes one MembershipEntry entry for a given silo and the
/// TableVersion for this table. The MembershipEntry and the TableVersion have to be read atomically.
/// </summary>
/// <param name="siloAddress">The address of the silo whose membership information needs to be read.</param>
/// <returns>The membership information for a given silo: MembershipTableData consisting one MembershipEntry entry and
/// TableVersion, read atomically.</returns>
public Task<MembershipTableData> ReadRow(SiloAddress siloAddress)
{
return this.ReadEntries(siloAddress);
}
/// <summary>
/// Atomically reads the full content of the Membership Table.
/// The returned MembershipTableData includes all MembershipEntry entry for all silos in the table and the
/// TableVersion for this table. The MembershipEntries and the TableVersion have to be read atomically.
/// </summary>
/// <returns>The membership information for a given table: MembershipTableData consisting multiple MembershipEntry entries and
/// TableVersion, all read atomically.</returns>
public Task<MembershipTableData> ReadAll()
{
return this.ReadEntries();
}
/// <summary>
/// Atomically tries to insert (add) a new MembershipEntry for one silo and also update the TableVersion.
/// If operation succeeds, the following changes would be made to the table:
/// 1) New MembershipEntry will be added to the table.
/// 2) The newly added MembershipEntry will also be added with the new unique automatically generated eTag.
/// 3) TableVersion.Version in the table will be updated to the new TableVersion.Version.
/// 4) TableVersion etag in the table will be updated to the new unique automatically generated eTag.
/// All those changes to the table, insert of a new row and update of the table version and the associated etags, should happen atomically, or fail atomically with no side effects.
/// The operation should fail in each of the following conditions:
/// 1) A MembershipEntry for a given silo already exist in the table
/// 2) Update of the TableVersion failed since the given TableVersion etag (as specified by the TableVersion.VersionEtag property) did not match the TableVersion etag in the table.
/// </summary>
/// <param name="entry">MembershipEntry to be inserted.</param>
/// <param name="tableVersion">The new TableVersion for this table, along with its etag.</param>
/// <returns>True if the insert operation succeeded and false otherwise.</returns>
public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersion)
{
var rowNames = RowNames.Create(entry.SiloAddress);
var newETag = (tableVersion.Version + 1).ToString(CultureInfo.InvariantCulture);
var ops = new PropertyBatchOperation[]
{
// Check preconditions.
new CheckValuePropertyOperation(ETagPropertyName, tableVersion.VersionEtag),
new CheckExistsPropertyOperation(rowNames.Entry, false),
// Update version and insert rows.
new PutPropertyOperation(VersionPropertyName, tableVersion.Version),
new PutPropertyOperation(ETagPropertyName, newETag),
new PutPropertyOperation(rowNames.Entry, JsonConvert.SerializeObject(entry, this.serializerSettings)),
new PutPropertyOperation(rowNames.Alive, entry.IAmAliveTime.Ticks),
new PutPropertyOperation(rowNames.ETag, newETag)
};
var result = await this.store.SubmitPropertyBatchAsync(this.tableUri, ops);
// A value of -1 indicates that no operation failed.
return result.FailedOperationIndex == -1;
}
/// <summary>
/// Atomically tries to update the MembershipEntry for one silo and also update the TableVersion.
/// If operation succeeds, the following changes would be made to the table:
/// 1) The MembershipEntry for this silo will be updated to the new MembershipEntry (the old entry will be fully substitued by the new entry)
/// 2) The eTag for the updated MembershipEntry will also be eTag with the new unique automatically generated eTag.
/// 3) TableVersion.Version in the table will be updated to the new TableVersion.Version.
/// 4) TableVersion etag in the table will be updated to the new unique automatically generated eTag.
/// All those changes to the table, update of a new row and update of the table version and the associated etags, should happen atomically, or fail atomically with no side effects.
/// The operation should fail in each of the following conditions:
/// 1) A MembershipEntry for a given silo does not exist in the table
/// 2) A MembershipEntry for a given silo exist in the table but its etag in the table does not match the provided etag.
/// 3) Update of the TableVersion failed since the given TableVersion etag (as specified by the TableVersion.VersionEtag property) did not match the TableVersion etag in the table.
/// </summary>
/// <param name="entry">MembershipEntry to be updated.</param>
/// <param name="etag">The etag for the given MembershipEntry.</param>
/// <param name="tableVersion">The new TableVersion for this table, along with its etag.</param>
/// <returns>True if the update operation succeeded and false otherwise.</returns>
public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion)
{
var rowNames = RowNames.Create(entry.SiloAddress);
var newETag = (tableVersion.Version + 1).ToString(CultureInfo.InvariantCulture);
var ops = new PropertyBatchOperation[]
{
// Check preconditions.
new CheckValuePropertyOperation(ETagPropertyName, tableVersion.VersionEtag),
new CheckValuePropertyOperation(rowNames.ETag, etag ?? DefaultETag),
new CheckExistsPropertyOperation(rowNames.Entry, true),
// Update version and insert rows.
new PutPropertyOperation(VersionPropertyName, tableVersion.Version),
new PutPropertyOperation(ETagPropertyName, newETag),
new PutPropertyOperation(rowNames.Entry, JsonConvert.SerializeObject(entry, this.serializerSettings)),
new PutPropertyOperation(rowNames.Alive, entry.IAmAliveTime.Ticks),
new PutPropertyOperation(rowNames.ETag, newETag)
};
var result = await this.store.SubmitPropertyBatchAsync(this.tableUri, ops);
// A value of -1 indicates that no operation failed.
return result.FailedOperationIndex == -1;
}
/// <summary>
/// Updates the IAmAlive part (column) of the MembershipEntry for this silo.
/// This operation should only update the IAmAlive collumn and not change other columns.
/// This operation is a "dirty write" or "in place update" and is performed without etag validation.
/// With regards to eTags update:
/// This operation may automatically update the eTag associated with the given silo row, but it does not have to. It can also leave the etag not changed ("dirty write").
/// With regards to TableVersion:
/// this operation should not change the TableVersion of the table. It should leave it untouched.
/// There is no scenario where this operation could fail due to table semantical reasons. It can only fail due to network problems or table unavailability.
/// </summary>
/// <param name="entry"></param>
/// <returns>Task representing the successful execution of this operation. </returns>
public Task UpdateIAmAlive(MembershipEntry entry)
{
return this.store.PutPropertyAsync(
this.tableUri,
RowNames.GetAliveRowName(entry.SiloAddress),
entry.IAmAliveTime.Ticks);
}
/// <summary>
/// Returns the list of gateways (silos) that can be used by a client to connect to Orleans cluster.
/// The Uri is in the form of: "gwy.tcp://IP:port/Generation". See Utils.ToGatewayUri and Utils.ToSiloAddress for more details about Uri format.
/// </summary>
public async Task<IList<Uri>> GetGateways()
{
var allSilos = await this.ReadAll();
return
allSilos.Members.Select(e => e.Item1)
.Where(m => m.Status == SiloStatus.Active && m.ProxyPort != 0)
.Select(GetGatewayUri)
.ToList();
}
/// <summary>
/// Specifies how often this IGatewayListProvider is refreshed, to have a bound on max staleness of its returned infomation.
/// </summary>
public TimeSpan MaxStaleness { get; private set; }
/// <summary>
/// Specifies whether this IGatewayListProvider ever refreshes its returned infomation, or always returns the same gw list.
/// (currently only the static config based StaticGatewayListProvider is not updatable. All others are.)
/// </summary>
public bool IsUpdatable => true;
/// <summary>
/// Deletes all table entries of the given deploymentId
/// </summary>
public Task DeleteMembershipTableEntries(string deploymentId)
{
return this.store.DeleteNameAsync(GetTableUri(deploymentId));
}
private void Initialize(Logger logger, string deploymentId)
{
this.tableUri = GetTableUri(deploymentId);
this.log = logger;
this.store = this.fabricClient.PropertyManager;
}
private static Uri GetTableUri(string deploymentId)
{
return new Uri("fabric:/silos_" + deploymentId);
}
private async Task<MembershipTableData> ReadEntries(SiloAddress siloAddress = null)
{
MembershipTableData result;
do
{
// Continue attempting to read from the table until a consistent read is made.
result = await this.TryReadEntries(siloAddress);
}
while (result == null);
return result;
}
/// <summary>
/// Attempts to read membership entries, returning <see langword="null"/> if the table was modified during the read.
/// </summary>
/// <param name="siloAddress">The silo to read, or <see langword="null"/> to read all silos.</param>
/// <returns>
/// The membership entries or <see langword="null"/> if the tables was modified during the read operation.
/// </returns>
private async Task<MembershipTableData> TryReadEntries(SiloAddress siloAddress = null)
{
var suffix = siloAddress == null ? string.Empty : "_" + siloAddress.ToParsableString();
var entries = new Dictionary<string, PropertyTableEntry>();
var tableVersion = 0;
string tableETag = null;
PropertyEnumerationResult result = null;
do
{
result = await this.store.EnumeratePropertiesAsync(this.tableUri, true, result);
if (!result.IsConsistent)
{
// The table was modified while enumerating the properties.
return null;
}
foreach (var property in result)
{
var name = property.Metadata.PropertyName;
if (string.Equals(VersionPropertyName, name))
{
tableVersion = (int)property.GetValue<long>();
}
else if (string.Equals(ETagPropertyName, name))
{
tableETag = property.GetValue<string>();
}
else if (name.StartsWith(".") & name.EndsWith(suffix))
{
var key = GetSiloAddress(name);
PropertyTableEntry entry;
if (!entries.TryGetValue(key, out entry))
{
entry = entries[key] = new PropertyTableEntry();
}
if (name.StartsWith(RowNames.EntryPrefix))
{
entry.Entry = JsonConvert.DeserializeObject<MembershipEntry>(
property.GetValue<string>(),
this.serializerSettings);
}
else if (name.StartsWith(RowNames.AlivePrefix))
{
entry.LastIAmAliveTime = property.GetValue<long>();
}
else if (name.StartsWith(RowNames.ETagPrefix))
{
entry.ETag = property.GetValue<string>();
}
}
}
}
while (result.HasMoreData);
var results = new List<Tuple<MembershipEntry, string>>(entries.Count);
foreach (var entry in entries.Values)
{
if (entry.Entry == null) continue;
entry.Entry.IAmAliveTime = new DateTime(entry.LastIAmAliveTime);
results.Add(Tuple.Create(entry.Entry, entry.ETag ?? DefaultETag));
}
return new MembershipTableData(results, new TableVersion(tableVersion, tableETag));
}
private static string GetSiloAddress(string name) => name.Substring(name.IndexOf('_') + 1);
private static Uri GetGatewayUri(MembershipEntry entry)
{
var siloAddress = entry.SiloAddress;
var endpoint = new IPEndPoint(siloAddress.Endpoint.Address, entry.ProxyPort);
return SiloAddress.New(endpoint, siloAddress.Generation).ToGatewayUri();
}
private struct RowNames
{
public const string EntryPrefix = ".ENTRY_";
public const string AlivePrefix = ".ALIVE_";
public const string ETagPrefix = ".ETAG_";
public static RowNames Create(SiloAddress siloAddress)
{
var key = siloAddress.ToParsableString();
return new RowNames
{
Entry = EntryPrefix + key,
ETag = ETagPrefix + key,
Alive = AlivePrefix + key
};
}
public static string GetAliveRowName(SiloAddress siloAddress)
=> AlivePrefix + siloAddress.ToParsableString();
public string Entry { get; private set; }
public string ETag { get; private set; }
public string Alive { get; private set; }
}
private class PropertyTableEntry
{
public MembershipEntry Entry { get; set; }
public string ETag { get; set; }
public long LastIAmAliveTime { get; set; }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.