Skip to content

Instantly share code, notes, and snippets.

@ejlp12
Last active November 24, 2015 23:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ejlp12/fb56de2e0c212fa37788 to your computer and use it in GitHub Desktop.
Save ejlp12/fb56de2e0c212fa37788 to your computer and use it in GitHub Desktop.

Demo AMQP dengan menggunakan JBoss A-MQ 6.2

Tambahakan baris berikut di file etc/activemq.xml

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/>

atau

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

Start A-MQ

cd /Servers/A-MQ_6.2/bin
./amq
JBossA-MQ:karaf@root> log:display |grep amqp
2015-11-19 04:07:33,873 | INFO  | pool-13-thread-1 | TransportServerThreadSupport     | ort.TransportServerThreadSupport   69 | 171 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-620133 | Listening for connections at: amqp://ejlp-macbook:5672
2015-11-19 04:07:33,874 | INFO  | pool-13-thread-1 | TransportConnector               | tivemq.broker.TransportConnector  260 | 171 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-620133 | Connector amqp started

Clone project contoh yang berisi program Producer dan Consumer

git clone https://github.com/FuseByExample/activemq-amqp-example.git

cd activemq-amqp-example
mvn -P consumer

Error

04:16:22 ERROR Error receiving message
javax.naming.NamingException [Root exception is java.net.MalformedURLException: Unknown URL option: 'brokerlist' in connection URL]
	at org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory.getInitialContext(PropertiesFileInitialContextFactory.java:126)
...
Caused by: java.net.MalformedURLException: Unknown URL option: 'brokerlist' in connection URL
	at org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl$OptionSetter.parseOptions(ConnectionFactoryImpl.java:291)
...

Problem tersebut sama seperti yang diposting disini: http://stackoverflow.com/questions/14979184/client-failover-for-qpid-amqp-jms-client

Problem diatas terjadi karena ada format URI yang sudah tidak sesuai dengan standar AMQP versi 1.0. Format tersebut hanya valid untuk AMQP versi < 1.0. Untuk menyelesaikan masalah tersebut, edit dan ubah file src/main/resources/jndi.properties

Ubah baris berikut:

connectionfactory.myJmsFactory = amqp://admin:admin@localhost/test/?brokerlist='tcp://localhost:5672'

menjadi

connectionfactory.myJmsFactory = amqp://admin:admin@localhost:5672?remote-host=default

Jalankan lagi perintah untuk menstart Consumer

cd activemq-amqp-example
mvn -P consumer

Jika berhasil tidak ada error akan muncul teks seperti ini:

07:01:03 INFO  Start consuming messages from org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b with 120000ms timeout

Lalu jalankan perintah untuk men-start Producer

[WARNING] Warning: killAfter is now deprecated. Do you need it ? Please comment on MEXEC-6.
07:03:06 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '1. message sent
07:03:06 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '2. message sent
07:03:07 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '3. message sent
07:03:07 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '4. message sent
07:03:07 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '5. message sent
07:03:07 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '6. message sent
07:03:07 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '7. message sent
07:03:07 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '8. message sent
07:03:07 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '9. message sent
07:03:07 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '10. message sent
... [TERUS sampai 100 messages dikirim ke A-MQ]

Di console Consumer, anda akan melihat output berikut:

07:03:21 INFO  Got 1. message: 1. message sent
07:03:21 INFO  Got 2. message: 2. message sent
07:03:21 INFO  Got 3. message: 3. message sent
07:03:21 INFO  Got 4. message: 4. message sent
07:03:21 INFO  Got 5. message: 5. message sent
07:03:21 INFO  Got 6. message: 6. message sent
07:03:21 INFO  Got 7. message: 7. message sent
07:03:21 INFO  Got 8. message: 8. message sent
07:03:21 INFO  Got 9. message: 9. message sent
07:03:21 INFO  Got 10. message: 10. message sent
...[DAN SETERUSNYA sampai didapat 100 message]

http://mvnrepository.com/artifact/org.apache.qpid/qpid-amqp-1-0-client-jms

Artikel menarik tentang proses membuatan standar protokol AMQP http://www.imatix.com/articles:whats-wrong-with-amqp/

@ejlp12
Copy link
Author

ejlp12 commented Nov 24, 2015

More uptodate example:

  • Edit etc/user.properties

    guest=guest,admin
    
  • Start A-MQ

  • Clone example repository code and run the example class:

    git clone https://github.com/apache/qpid-jms.git
    cd qpid-jms/ 
    mvn clean package dependency:copy-dependencies -DincludeScope=runtime -DskipTests
    cd qpid-jms-examples/
    java -cp "target/classes/:target/dependency/*" org.apache.qpid.jms.example.HelloWorld
    

    Successful result will print "Hello World!"

  • See the code:

    cat vi src/main/java/org/apache/qpid/jms/example/HelloWorld.java
    
            Context context = new InitialContext();
    
            ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
            Destination queue = (Destination) context.lookup("myQueueLookup");
    
            Connection connection = factory.createConnection(USER, PASSWORD);
            connection.setExceptionListener(new MyExceptionListener());
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            MessageProducer messageProducer = session.createProducer(queue);
            MessageConsumer messageConsumer = session.createConsumer(queue);
    
            TextMessage message = session.createTextMessage("Hello world!");
            messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
            TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L);
    

    and the configuration:

    cat vi src/main/resources/jndi.properties
    
    connectionfactory.myFactoryLookup = amqp://localhost:5672
    queue.myQueueLookup = queue
    topic.myTopicLookup = topic
    

Reference: https://access.redhat.com/documentation/en-US/Red_Hat_JBoss_A-MQ/6.2/html/Client_Connectivity_Guide/AMQP.html#AMQP-Start-Clients

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment