Skip to content

Instantly share code, notes, and snippets.

@nopolabs
Last active June 3, 2016 08:57
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nopolabs/6490963 to your computer and use it in GitHub Desktop.
Save nopolabs/6490963 to your computer and use it in GitHub Desktop.
Sample code demonstrating reading and writing to Apollo MQ using Java, but failure to read messages posted to Apollo MQ from PHP using STOMP.
package test;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Apollo {
public static void main(String[] args) throws JMSException {
String user = "guest";
String pass = "guest";
String broker = "tcp://127.0.0.1:61616";
String queue = "test";
String text = "hello!";
// demonstrate send and receive work
send(user, pass, broker, queue, text);
receive(user, pass, broker, queue);
System.out.println("waiting for a message delivered by STOMP using send.php");
receive(user, pass, broker, queue);
}
private static void receive(String user, String pass, String broker, String queue) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, pass, broker);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(
false, // NON_TRANSACTED
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queue);
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
System.out.println("Received message [" + message + "]");
connection.close();
}
private static void send(String user, String pass, String broker, String queue, String text) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, pass, broker);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(
false, // NON_TRANSACTED
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queue);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(text);
producer.send(message);
System.out.println("Sent message [" + message + "]");
connection.close();
}
}
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
<notes>Apollo configuration</notes>
<log_category console="console" security="security" connection="connection" audit="audit"/>
<authentication domain="apollo"/>
<access_rule allow="*" action="*"/>
<virtual_host id="demo">
<host_name>localhost</host_name>
<host_name>127.0.0.1</host_name>
<leveldb_store directory="${apollo.base}/data/demo"/>
<queue id="test"/>
</virtual_host>
<web_admin bind="http://127.0.0.1:61680"/>
<web_admin bind="https://0.0.0.0:61681"/>
<connector id="tcp-61616" bind="tcp://0.0.0.0:61616" connection_limit="2000"/>
<key_storage file="${apollo.base}/etc/keystore" password="password" key_password="password"/>
</broker>
<?php
require_once("Stomp.php");
$port = 61616;
$dest = "/queue/test";
function recv($dest, $port) {
$con = new Stomp("tcp://localhost:$port");
$con->connect("guest", "guest");
$con->subscribe($dest);
$msg = $con->readFrame();
if ( $msg != null) {
echo "Received from $dest:\n";
print_r($msg);
if ($msg instanceof StompFrame) {
$con->ack($msg->headers['message-id']);
} else {
$con->ack($msg);
}
} else {
echo "Failed to receive a message\n";
}
$con->disconnect();
}
recv($dest, $port);
?>
<?php
require_once("Stomp.php"); // http://stomp.fusesource.org/download.html
$port = 61616;
$dest = "/queue/test";
function send($msg, $dest, $port) {
$con = new Stomp("tcp://localhost:$port");
$con->connect("guest", "guest");
$headers = array();
$con->send($dest, $msg, $headers);
$con->disconnect();
}
send("world", $dest, $port);
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment