Skip to content

Instantly share code, notes, and snippets.

@lobster2012-user
Last active December 22, 2019 04:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lobster2012-user/d6e2c2eac7f8ed3cfa5c0d06ecfa9cd3 to your computer and use it in GitHub Desktop.
Save lobster2012-user/d6e2c2eac7f8ed3cfa5c0d06ecfa9cd3 to your computer and use it in GitHub Desktop.
public class Amqp2ProviderFactory : IProviderFactory
{
public class Amqp2Provider : IProvider
{
private readonly IProvider _prov;
public Amqp2Provider(IProvider prov)
{
_prov = prov ?? throw new ArgumentNullException(nameof(prov));
}
#region proxy
public long SendTimeout => _prov.SendTimeout;
public Uri RemoteUri => _prov.RemoteUri;
public INmsMessageFactory MessageFactory => _prov.MessageFactory;
public Task Acknowledge(Id sessionId, AckType ackType)
{
return _prov.Acknowledge(sessionId, ackType);
}
public Task Acknowledge(InboundMessageDispatch envelope, AckType ackType)
{
return _prov.Acknowledge(envelope, ackType);
}
public void Close()
{
_prov.Close();
}
public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
{
return _prov.Commit(transactionInfo, nextTransactionInfo);
}
public Task Connect(ConnectionInfo connectionInfo)
{
/*
var remoteHost = typeof(ConnectionInfo)
.GetProperty("remoteHost", System.Reflection.BindingFlags.NonPublic |
System.Reflection.BindingFlags.Instance);
remoteHost.SetValue(connectionInfo, RemoteUri);
*/
return _prov.Connect(connectionInfo);
}
public Task DestroyResource(ResourceInfo resourceInfo)
{
return _prov.DestroyResource(resourceInfo);
}
public Task Recover(Id sessionId)
{
return _prov.Recover(sessionId);
}
public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
{
return _prov.Rollback(transactionInfo, nextTransactionInfo);
}
public Task Send(OutboundMessageDispatch envelope)
{
return _prov.Send(envelope);
}
public void SetProviderListener(IProviderListener providerListener)
{
_prov.SetProviderListener(providerListener);
}
public void Start()
{
_prov.Start();
}
public Task StartResource(ResourceInfo resourceInfo)
{
return _prov.StartResource(resourceInfo);
}
public Task StopResource(ResourceInfo resourceInfo)
{
return _prov.StopResource(resourceInfo);
}
public Task Unsubscribe(string name)
{
return _prov.Unsubscribe(name);
}
#endregion
public Task CreateResource(ResourceInfo resourceInfo)
{
if (resourceInfo is ConsumerInfo ci)
{
var keyValues = this.RemoteUri.Query.TrimStart('?').Split('&');
foreach (var keyValue in keyValues)
{
var p = keyValue.Split(new[] { '=' }, 2);
var key = p[0].Trim();
var value = p.Length > 1 ? p[1].Trim() : "";
if (String.Equals(key, "consumerWindowSize", StringComparison.OrdinalIgnoreCase))
{
var linkCreditField = typeof(ConsumerInfo)
.GetField("credit", System.Reflection.BindingFlags.NonPublic |
System.Reflection.BindingFlags.Instance);
linkCreditField.SetValue(ci, Int32.Parse(value));
}
}
}
return _prov.CreateResource(resourceInfo);
}
}
public IProvider CreateProvider(Uri remoteUri)
{
var remote = remoteUri.ToString().Replace($"{Schema}://", "amqp://");
return new Amqp2Provider(
ProviderFactory.Create(new Uri(remote))
);
}
public static readonly string Schema = "amqp2";
public static void Register()
{
ProviderFactory.RegisterProviderFactory(Schema, new Amqp2ProviderFactory());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment