Skip to content

Instantly share code, notes, and snippets.

@OleTraveler
Created February 21, 2011 17:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save OleTraveler/837393 to your computer and use it in GitHub Desktop.
Save OleTraveler/837393 to your computer and use it in GitHub Desktop.
Wrapper around JMS to commit messages after a threshold has been met in order to commit messages in blocks.
package com.gaiam.gcsi.util;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
/**
* Closes and reopens the connection after 800 messages. This is not thread safe, at all.
* This is basically like the template method, in that you implement {@link MessageFactory}
* (usually as in anonymous class) and pass it to the {@link #send} method.
* Don't forget to call {@link #closeConnection()} when you are done.
*
* @author tstevens
*/
public class JmsControl {
public static interface MessageFactory {
public Message createMessage(Session session) throws JMSException;
}
private final ConnectionFactory connectionFactory;
private final Queue queue;
private final int maxNumMsgs = 800;
private Session session;
private MessageProducer producer;
private Connection con;
private int msgsSent = 0;
public JmsControl(ConnectionFactory connectionFactory, Queue queue) {
this.connectionFactory = connectionFactory;
this.queue = queue;
}
public void send(MessageFactory mFactory) {
try {
if (!isConnectionOpen()) {
createConnection();
}
Message m = mFactory.createMessage(session);
producer.send(m);
if (msgsSent++ >= maxNumMsgs) {
closeConnection();
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
public void closeConnection() {
if (isConnectionOpen()) {
try {
session.close();
con.close();
con = null;
session = null;
producer = null;
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
}
private boolean isConnectionOpen() {
return con != null;
}
private void createConnection() throws JMSException {
con = connectionFactory.createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(queue);
msgsSent = 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment