Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@chirino
Created October 16, 2015 17:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chirino/2266b8b12df3d2058292 to your computer and use it in GitHub Desktop.
Save chirino/2266b8b12df3d2058292 to your computer and use it in GitHub Desktop.
package examples;
import io.vertx.core.AsyncResult;
import io.vertx.core.AsyncResultHandler;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.message.Message;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
*
*/
public class AmqpExample {
public static void main(String[] args) throws InterruptedException {
// Connection are logical and kinda independent of the Socket (they can even switch sockets).
final Connection connection = Proton.connection();
connection.setContainer("client-id:1");
connection.open();
Session session = connection.session();
session.open();
Target target = new Target();
target.setAddress("queue://foo");
Sender sender = session.sender("link1");
sender.setTarget(target);
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
sender.open();
Delivery delivery1 = sendMessage(sender, "msg:1", "Helo World");
Vertx vertx = Vertx.vertx();
NetClient client = connect(vertx, connection, "localhost", 5672);
// The proton engine API is not thread safe.. so at this
// point this would not be safe since we need to synchronize /w the
// actions the NetClient is doing.
Delivery delivery2 = sendMessage(sender, "msg:2", "Helo World 2");
Thread.sleep(1000);
}
private static NetClient connect(Vertx vertx, final Connection connection, String localhost, int port) {
// Since the connection is not yet attached to a socket, all the above actions queued up
final NetClient client = vertx.createNetClient();
client.connect(port, localhost, new AsyncResultHandler<NetSocket>() {
final Transport transport = Proton.transport();
NetSocket socket;
ByteBuffer pendingFromVertx;
@Override
public void handle(AsyncResult<NetSocket> event) {
if (event.succeeded()) {
transport.bind(connection);
socket = event.result();
socket.handler(new Handler<Buffer>() {
@Override
public void handle(Buffer event) {
assert pendingFromVertx == null;
pendingFromVertx = ByteBuffer.wrap(event.getBytes());
socket.pause();
// Have proton process bytes from the network
pumpPendingFromVertx();
// at this point we really should call back to user logic
// to handle processing the updated AMQP state.
// Proton my want to reply in response to those bytes.. so
// send bytes out of proton into vert.x
pumpPendingToVertx();
}
});
socket.drainHandler(new Handler<Void>() {
@Override
public void handle(Void event) {
pumpPendingToVertx();
}
});
pumpPendingToVertx();
} else {
System.out.println("Failed to connect: " + event.cause());
System.exit(1);
}
}
private void pumpPendingToVertx() {
ByteBuffer outputBuffer = transport.getOutputBuffer();
while( !socket.writeQueueFull() && outputBuffer.hasRemaining() ) {
byte buffer[] = new byte[outputBuffer.remaining()];
outputBuffer.get(buffer);
socket.write(Buffer.buffer(buffer));
transport.outputConsumed();
}
}
private void pumpPendingFromVertx() {
// Lets push bytes from vert.x to proton engine.
ByteBuffer inputBuffer = transport.getInputBuffer();
while (pendingFromVertx.hasRemaining() && inputBuffer.hasRemaining()) {
inputBuffer.put(pendingFromVertx.get());
}
transport.processInput().checkIsOk();
if (!pendingFromVertx.hasRemaining()) {
pendingFromVertx = null;
socket.resume();
}
}
});
return client;
}
static private Delivery sendMessage(Sender sender, String deliveryTag, String messageBody) {
int BUFFER_SIZE = 1024;
Message m = Proton.message();
m.setBody(new AmqpValue(messageBody));
byte[] encodedMessage = new byte[BUFFER_SIZE];
int len = m.encode(encodedMessage, 0, BUFFER_SIZE);
byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
Delivery serverDelivery = sender.delivery(tag);
sender.send(encodedMessage, 0, len);
sender.advance();
return serverDelivery;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment