Skip to content

Instantly share code, notes, and snippets.

@scholzj
Last active September 8, 2015 16:04

Revisions

  1. scholzj revised this gist Sep 8, 2015. 1 changed file with 5 additions and 1 deletion.
    6 changes: 5 additions & 1 deletion RequestResponse.cs
    Original file line number Diff line number Diff line change
    @@ -3,6 +3,7 @@
    using Amqp;
    using Amqp.Sasl;
    using Amqp.Framing;
    using Amqp.Types;
    using System.Security.Cryptography.X509Certificates;
    using System.Net.Security;
    using System.Threading.Tasks;
    @@ -49,7 +50,10 @@ static async Task<int> SslConnectionTestAsync()
    Session session = new Session(connection);

    SenderLink sender = new SenderLink(session, "request-sender", "request.ABCFR_ABCFRALMMACC1");
    ReceiverLink receiver = new ReceiverLink(session, "response-receiver", "response.ABCFR_ABCFRALMMACC1");

    Map filters = new Map();
    filters.Add(new Symbol("apache.org:selector-filter:string"), new DescribedValue(new Symbol("apache.org:selector-filter:string"), "amqp.correlation_id='123456'"));
    ReceiverLink receiver = new ReceiverLink(session, "response-receiver", new Source() { Address = "response.CBKFR_TESTCALMMACC1", FilterSet = filters}, null);

    Message request = new Message("Hello world!");
    request.Header = new Header();
  2. scholzj revised this gist Sep 8, 2015. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion BroadcastReceiver.cs
    Original file line number Diff line number Diff line change
    @@ -48,7 +48,7 @@ static async Task<int> SslConnectionTestAsync()

    Session session = new Session(connection);

    ReceiverLink receiver = new ReceiverLink(session, "helloworld-receiver", "broadcast.ABCFR_ABCFRALMMACC1.TradeConfirmation");
    ReceiverLink receiver = new ReceiverLink(session, "broadcast-receiver", "broadcast.ABCFR_ABCFRALMMACC1.TradeConfirmation");

    while (true)
    {
  3. scholzj revised this gist Sep 8, 2015. 2 changed files with 11 additions and 3 deletions.
    8 changes: 6 additions & 2 deletions BroadcastReceiver.cs
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,5 @@
    using System;
    using System.Text;
    using Amqp;
    using Amqp.Sasl;
    using Amqp.Framing;
    @@ -57,8 +58,11 @@ static async Task<int> SslConnectionTestAsync()
    {
    break;
    }

    Console.WriteLine("Received message: {0}", msg.ToString());

    Amqp.Framing.Data payload = (Amqp.Framing.Data)response.BodySection;
    String payloadText = Encoding.UTF8.GetString(payload.Binary);

    Console.WriteLine("Received message: {0}", payloadText);
    receiver.Accept(msg);
    }

    6 changes: 5 additions & 1 deletion RequestResponse.cs
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,5 @@
    using System;
    using System.Text;
    using Amqp;
    using Amqp.Sasl;
    using Amqp.Framing;
    @@ -62,7 +63,10 @@ static async Task<int> SslConnectionTestAsync()

    if (response != null)
    {
    Console.WriteLine("Received response message: {0} with body {1} and with correlation ID {2}", response.ToString(), response.Body.ToString(), response.Properties.CorrelationId);
    Amqp.Framing.Data payload = (Amqp.Framing.Data)response.BodySection;
    String payloadText = Encoding.UTF8.GetString(payload.Binary);

    Console.WriteLine("Received response message: with body {1} and with correlation ID {2}", payloadText, response.Properties.CorrelationId);
    receiver.Accept(response);
    }
    else
  4. scholzj created this gist Sep 6, 2015.
    84 changes: 84 additions & 0 deletions BroadcastReceiver.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,84 @@
    using System;
    using Amqp;
    using Amqp.Sasl;
    using Amqp.Framing;
    using System.Security.Cryptography.X509Certificates;
    using System.Net.Security;
    using System.Threading.Tasks;

    namespace BroadcastReceiver
    {
    class BroadcastReceiver
    {
    public static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
    {
    if (sslPolicyErrors == SslPolicyErrors.None)
    return true;

    Console.WriteLine("Certificate error: {0}", sslPolicyErrors);

    return false;
    }


    static async Task<int> SslConnectionTestAsync()
    {
    try
    {
    ConnectionFactory factory = new ConnectionFactory();

    String certFile = "c:\\Users\\JAkub\\Downloads\\ABCFR_ABCFRALMMACC1.crt";
    factory.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
    factory.SSL.LocalCertificateSelectionCallback = (a, b, c, d, e) => X509Certificate.CreateFromCertFile(certFile);
    factory.SSL.ClientCertificates.Add(X509Certificate.CreateFromCertFile(certFile));

    factory.AMQP.MaxFrameSize = 64 * 1024;
    factory.AMQP.ContainerId = "fixml-client";

    factory.SASL.Profile = SaslProfile.External;

    Trace.TraceLevel = TraceLevel.Frame;
    Trace.TraceListener = (f, a) => Console.WriteLine(String.Format(f, a));

    Connection.DisableServerCertValidation = false;

    Address brokerAddress = new Address("amqps://ecag-fixml-dev1:5671");
    Connection connection = await factory.CreateAsync(brokerAddress);

    Session session = new Session(connection);

    ReceiverLink receiver = new ReceiverLink(session, "helloworld-receiver", "broadcast.ABCFR_ABCFRALMMACC1.TradeConfirmation");

    while (true)
    {
    Message msg = receiver.Receive(60000);

    if (msg == null)
    {
    break;
    }

    Console.WriteLine("Received message: {0}", msg.ToString());
    receiver.Accept(msg);
    }

    Console.WriteLine("No message received for 60 seconds");

    await connection.CloseAsync();

    return 0;
    }
    catch (Exception e)
    {
    Console.WriteLine("Exception {0}.", e);
    return 1;
    }
    }

    static void Main(string[] args)
    {
    Task<int> task = SslConnectionTestAsync();
    task.Wait();
    }
    }
    }
    90 changes: 90 additions & 0 deletions RequestResponse.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,90 @@
    using System;
    using Amqp;
    using Amqp.Sasl;
    using Amqp.Framing;
    using System.Security.Cryptography.X509Certificates;
    using System.Net.Security;
    using System.Threading.Tasks;

    namespace RequestRepsonse
    {
    class RequestRepsonse
    {
    public static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
    {
    if (sslPolicyErrors == SslPolicyErrors.None)
    return true;

    Console.WriteLine("Certificate error: {0}", sslPolicyErrors);

    return false;
    }


    static async Task<int> SslConnectionTestAsync()
    {
    try
    {
    ConnectionFactory factory = new ConnectionFactory();

    String certFile = "c:\\Users\\JAkub\\Downloads\\ABCFR_ABCFRALMMACC1.crt";
    factory.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
    factory.SSL.LocalCertificateSelectionCallback = (a, b, c, d, e) => X509Certificate.CreateFromCertFile(certFile);
    factory.SSL.ClientCertificates.Add(X509Certificate.CreateFromCertFile(certFile));

    factory.AMQP.MaxFrameSize = 64 * 1024;
    factory.AMQP.ContainerId = "fixml-client";

    factory.SASL.Profile = SaslProfile.External;

    Trace.TraceLevel = TraceLevel.Frame;
    Trace.TraceListener = (f, a) => Console.WriteLine(String.Format(f, a));

    Connection.DisableServerCertValidation = false;

    Address brokerAddress = new Address("amqps://ecag-fixml-dev1:5671");
    Connection connection = await factory.CreateAsync(brokerAddress);

    Session session = new Session(connection);

    SenderLink sender = new SenderLink(session, "request-sender", "request.ABCFR_ABCFRALMMACC1");
    ReceiverLink receiver = new ReceiverLink(session, "response-receiver", "response.ABCFR_ABCFRALMMACC1");

    Message request = new Message("Hello world!");
    request.Header = new Header();
    request.Header.Durable = true;
    request.Properties = new Properties();
    request.Properties.CorrelationId = "123456";
    request.Properties.ReplyTo = "response/response.ABCFR_ABCFRALMMACC1";
    sender.Send(request);

    Message response = receiver.Receive(60000);

    if (response != null)
    {
    Console.WriteLine("Received response message: {0} with body {1} and with correlation ID {2}", response.ToString(), response.Body.ToString(), response.Properties.CorrelationId);
    receiver.Accept(response);
    }
    else
    {
    Console.WriteLine("No message received for 60 seconds");
    }

    await connection.CloseAsync();

    return 0;
    }
    catch (Exception e)
    {
    Console.WriteLine("Exception {0}.", e);
    return 1;
    }
    }

    static void Main(string[] args)
    {
    Task<int> task = SslConnectionTestAsync();
    task.Wait();
    }
    }
    }