Skip to content

Instantly share code, notes, and snippets.

@solo-seven
Created September 3, 2011 15:33
Show Gist options
  • Save solo-seven/1191351 to your computer and use it in GitHub Desktop.
Save solo-seven/1191351 to your computer and use it in GitHub Desktop.
package com.redhat.msg;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.naming.InitialContext;
import javax.naming.NameNotFoundException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.sql.DataSource;
import javax.sql.XADataSource;
import javax.transaction.UserTransaction;
/**
* Servlet implementation class MessageProcessor
*/
public class MessageProcessor extends HttpServlet {
private static final long serialVersionUID = 1L;
private final ArrayList<ScenarioProcessor> processors = new ArrayList<ScenarioProcessor>();
/**
* @see HttpServlet#HttpServlet()
*/
public MessageProcessor() {
super();
}
/**
* @see Servlet#init(ServletConfig)
*/
public void init(ServletConfig config) throws ServletException {
processors.add(new ScenarioProcessor("/queue/FirstQueue"));
processors.add(new ScenarioProcessor("/queue/FirstQueue"));
processors.add(new ScenarioProcessor("/queue/SecondQueue"));
processors.add(new ScenarioProcessor("/queue/SecondQueue"));
for(ScenarioProcessor processor : processors) {
new Thread(processor).start();
}
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse response)
*/
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
for(ScenarioProcessor processor : processors) {
processor.change();
}
}
private class ScenarioProcessor implements Runnable {
private final String queueName;
private boolean run = true;
public ScenarioProcessor(String queueName) {
this.queueName = queueName;
}
public void change() { this.run = !this.run; }
public void run() {
while(this.run) {
try {
boolean result = this.checkMessage();
try {
if(!result) Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
}
} catch(Exception e) {
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException ex) {
}
}
}
}
private boolean checkMessage() {
try {
InitialContext ic = new InitialContext();
XAConnectionFactory cf;
UserTransaction ut;
DataSource ds;
try {
cf = (XAConnectionFactory)ic.lookup("XAConnectionFactory");
ut = (UserTransaction)ic.lookup("UserTransaction");
ds = (DataSource)ic.lookup("java:/LocktestDS");
} catch(NameNotFoundException nnfe) {
return false;
}
ut.begin();
XAConnection conn = null;
XASession sess = null;
java.sql.Connection sqlConn = null;
PreparedStatement stmt = null;
boolean result = true;
try {
conn = cf.createXAConnection();
Queue queue = (Queue)ic.lookup(this.queueName);
conn.start();
sess = conn.createXASession();
MessageConsumer consumer = sess.createConsumer(queue);
TextMessage msg = (TextMessage) consumer.receive(10);
if(msg != null) {
System.out.println("Message value : " + msg.getText());
sqlConn = ds.getConnection();
stmt = sqlConn.prepareStatement("insert into locktest_data(value) values(?)");
stmt.setString(1, msg.getText());
stmt.executeUpdate();
} else {
result = false;
}
} catch(Exception ex) {
ut.rollback();
throw ex;
} finally {
if(sess != null) sess.close();
if(conn != null) conn.close();
if(stmt != null) stmt.close();
if(sqlConn != null) sqlConn.close();
}
ut.commit();
return result;
} catch(Exception e) {
e.printStackTrace();
return false;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment