-
-
Save lobster2012-user/d6e2c2eac7f8ed3cfa5c0d06ecfa9cd3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Amqp2ProviderFactory : IProviderFactory | |
{ | |
public class Amqp2Provider : IProvider | |
{ | |
private readonly IProvider _prov; | |
public Amqp2Provider(IProvider prov) | |
{ | |
_prov = prov ?? throw new ArgumentNullException(nameof(prov)); | |
} | |
#region proxy | |
public long SendTimeout => _prov.SendTimeout; | |
public Uri RemoteUri => _prov.RemoteUri; | |
public INmsMessageFactory MessageFactory => _prov.MessageFactory; | |
public Task Acknowledge(Id sessionId, AckType ackType) | |
{ | |
return _prov.Acknowledge(sessionId, ackType); | |
} | |
public Task Acknowledge(InboundMessageDispatch envelope, AckType ackType) | |
{ | |
return _prov.Acknowledge(envelope, ackType); | |
} | |
public void Close() | |
{ | |
_prov.Close(); | |
} | |
public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo) | |
{ | |
return _prov.Commit(transactionInfo, nextTransactionInfo); | |
} | |
public Task Connect(ConnectionInfo connectionInfo) | |
{ | |
/* | |
var remoteHost = typeof(ConnectionInfo) | |
.GetProperty("remoteHost", System.Reflection.BindingFlags.NonPublic | | |
System.Reflection.BindingFlags.Instance); | |
remoteHost.SetValue(connectionInfo, RemoteUri); | |
*/ | |
return _prov.Connect(connectionInfo); | |
} | |
public Task DestroyResource(ResourceInfo resourceInfo) | |
{ | |
return _prov.DestroyResource(resourceInfo); | |
} | |
public Task Recover(Id sessionId) | |
{ | |
return _prov.Recover(sessionId); | |
} | |
public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo) | |
{ | |
return _prov.Rollback(transactionInfo, nextTransactionInfo); | |
} | |
public Task Send(OutboundMessageDispatch envelope) | |
{ | |
return _prov.Send(envelope); | |
} | |
public void SetProviderListener(IProviderListener providerListener) | |
{ | |
_prov.SetProviderListener(providerListener); | |
} | |
public void Start() | |
{ | |
_prov.Start(); | |
} | |
public Task StartResource(ResourceInfo resourceInfo) | |
{ | |
return _prov.StartResource(resourceInfo); | |
} | |
public Task StopResource(ResourceInfo resourceInfo) | |
{ | |
return _prov.StopResource(resourceInfo); | |
} | |
public Task Unsubscribe(string name) | |
{ | |
return _prov.Unsubscribe(name); | |
} | |
#endregion | |
public Task CreateResource(ResourceInfo resourceInfo) | |
{ | |
if (resourceInfo is ConsumerInfo ci) | |
{ | |
var keyValues = this.RemoteUri.Query.TrimStart('?').Split('&'); | |
foreach (var keyValue in keyValues) | |
{ | |
var p = keyValue.Split(new[] { '=' }, 2); | |
var key = p[0].Trim(); | |
var value = p.Length > 1 ? p[1].Trim() : ""; | |
if (String.Equals(key, "consumerWindowSize", StringComparison.OrdinalIgnoreCase)) | |
{ | |
var linkCreditField = typeof(ConsumerInfo) | |
.GetField("credit", System.Reflection.BindingFlags.NonPublic | | |
System.Reflection.BindingFlags.Instance); | |
linkCreditField.SetValue(ci, Int32.Parse(value)); | |
} | |
} | |
} | |
return _prov.CreateResource(resourceInfo); | |
} | |
} | |
public IProvider CreateProvider(Uri remoteUri) | |
{ | |
var remote = remoteUri.ToString().Replace($"{Schema}://", "amqp://"); | |
return new Amqp2Provider( | |
ProviderFactory.Create(new Uri(remote)) | |
); | |
} | |
public static readonly string Schema = "amqp2"; | |
public static void Register() | |
{ | |
ProviderFactory.RegisterProviderFactory(Schema, new Amqp2ProviderFactory()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment