Created
February 21, 2011 17:37
-
-
Save OleTraveler/837393 to your computer and use it in GitHub Desktop.
Wrapper around JMS to commit messages after a threshold has been met in order to commit messages in blocks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.gaiam.gcsi.util; | |
import javax.jms.Connection; | |
import javax.jms.ConnectionFactory; | |
import javax.jms.JMSException; | |
import javax.jms.Message; | |
import javax.jms.MessageProducer; | |
import javax.jms.Queue; | |
import javax.jms.Session; | |
/** | |
* Closes and reopens the connection after 800 messages. This is not thread safe, at all. | |
* This is basically like the template method, in that you implement {@link MessageFactory} | |
* (usually as in anonymous class) and pass it to the {@link #send} method. | |
* Don't forget to call {@link #closeConnection()} when you are done. | |
* | |
* @author tstevens | |
*/ | |
public class JmsControl { | |
public static interface MessageFactory { | |
public Message createMessage(Session session) throws JMSException; | |
} | |
private final ConnectionFactory connectionFactory; | |
private final Queue queue; | |
private final int maxNumMsgs = 800; | |
private Session session; | |
private MessageProducer producer; | |
private Connection con; | |
private int msgsSent = 0; | |
public JmsControl(ConnectionFactory connectionFactory, Queue queue) { | |
this.connectionFactory = connectionFactory; | |
this.queue = queue; | |
} | |
public void send(MessageFactory mFactory) { | |
try { | |
if (!isConnectionOpen()) { | |
createConnection(); | |
} | |
Message m = mFactory.createMessage(session); | |
producer.send(m); | |
if (msgsSent++ >= maxNumMsgs) { | |
closeConnection(); | |
} | |
} catch (JMSException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public void closeConnection() { | |
if (isConnectionOpen()) { | |
try { | |
session.close(); | |
con.close(); | |
con = null; | |
session = null; | |
producer = null; | |
} catch (JMSException ex) { | |
throw new RuntimeException(ex); | |
} | |
} | |
} | |
private boolean isConnectionOpen() { | |
return con != null; | |
} | |
private void createConnection() throws JMSException { | |
con = connectionFactory.createConnection(); | |
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); | |
producer = session.createProducer(queue); | |
msgsSent = 0; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment