Skip to content

Instantly share code, notes, and snippets.

@lulf
Created August 5, 2016 11:00
Show Gist options
  • Save lulf/58990f3552648b0efabc0f69c9803bfa to your computer and use it in GitHub Desktop.
Save lulf/58990f3552648b0efabc0f69c9803bfa to your computer and use it in GitHub Desktop.
JMS clients
package org.apache.qpid.jms.example;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Main {
public static void main(String [] args) throws InterruptedException {
if (args.length < 1) {
throw new IllegalArgumentException("You must supply count");
} else {
int count = Integer.parseInt(args[0]);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.execute(() -> Receiver.receive("broker1", count));
executor.execute(() -> Receiver.receive("broker2", count));
System.out.println("Started subscribers");
Thread.sleep(10000);
System.out.println("Starting publisher");
executor.execute(() -> Sender.send("broker1", count));
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);
}
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.jms.example;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
public class Receiver {
public static void receive(String brokerId, int count) {
try {
// The configuration for the Qpid InitialContextFactory has been supplied in
// a jndi.properties file in the classpath, which results in it being picked
// up automatically by the InitialContext constructor.
Context context = new InitialContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup(brokerId);
Topic topic = (Topic)context.lookup("myTopicLookup");
Connection connection = factory.createConnection();
connection.setExceptionListener(new MyExceptionListener());
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Topic topic = session.createTopic("mytopic");
MessageConsumer messageConsumer = session.createConsumer(topic);
int actualCount = 0;
for (int i = 1; i <= count; i++, actualCount++) {
Message message = messageConsumer.receive();
System.out.println("Got message " + i + " from " + brokerId);
}
connection.close();
} catch (Exception exp) {
System.out.println("Caught exception, exiting.");
exp.printStackTrace(System.out);
System.exit(1);
}
}
private static class MyExceptionListener implements ExceptionListener {
@Override
public void onException(JMSException exception) {
System.out.println("Connection ExceptionListener fired, exiting.");
exception.printStackTrace(System.out);
System.exit(1);
}
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.jms.example;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
public class Sender {
private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
public static void send(String brokerId, int count) {
try {
// The configuration for the Qpid InitialContextFactory has been supplied in
// a jndi.properties file in the classpath, which results in it being picked
// up automatically by the InitialContext constructor.
Context context = new InitialContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup(brokerId);
Topic topic = (Topic)context.lookup("myTopicLookup");
Connection connection = factory.createConnection();
connection.setExceptionListener(new MyExceptionListener());
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i <= count; i++) {
TextMessage message = session.createTextMessage("Text!");
messageProducer.send(message, DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
System.out.println("Sent message " + i + " to " + brokerId);
}
connection.close();
} catch (Exception exp) {
System.out.println("Caught exception, exiting.");
exp.printStackTrace(System.out);
System.exit(1);
}
}
private static class MyExceptionListener implements ExceptionListener {
@Override
public void onException(JMSException exception) {
System.out.println("Connection ExceptionListener fired, exiting.");
exception.printStackTrace(System.out);
System.exit(1);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment