Created
September 3, 2011 15:33
-
-
Save solo-seven/1191351 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.redhat.msg; | |
import java.io.IOException; | |
import java.sql.PreparedStatement; | |
import java.util.ArrayList; | |
import javax.jms.MessageConsumer; | |
import javax.jms.Queue; | |
import javax.jms.TextMessage; | |
import javax.jms.XAConnection; | |
import javax.jms.XAConnectionFactory; | |
import javax.jms.XASession; | |
import javax.naming.InitialContext; | |
import javax.naming.NameNotFoundException; | |
import javax.servlet.ServletConfig; | |
import javax.servlet.ServletException; | |
import javax.servlet.http.HttpServlet; | |
import javax.servlet.http.HttpServletRequest; | |
import javax.servlet.http.HttpServletResponse; | |
import javax.sql.DataSource; | |
import javax.sql.XADataSource; | |
import javax.transaction.UserTransaction; | |
/** | |
* Servlet implementation class MessageProcessor | |
*/ | |
public class MessageProcessor extends HttpServlet { | |
private static final long serialVersionUID = 1L; | |
private final ArrayList<ScenarioProcessor> processors = new ArrayList<ScenarioProcessor>(); | |
/** | |
* @see HttpServlet#HttpServlet() | |
*/ | |
public MessageProcessor() { | |
super(); | |
} | |
/** | |
* @see Servlet#init(ServletConfig) | |
*/ | |
public void init(ServletConfig config) throws ServletException { | |
processors.add(new ScenarioProcessor("/queue/FirstQueue")); | |
processors.add(new ScenarioProcessor("/queue/FirstQueue")); | |
processors.add(new ScenarioProcessor("/queue/SecondQueue")); | |
processors.add(new ScenarioProcessor("/queue/SecondQueue")); | |
for(ScenarioProcessor processor : processors) { | |
new Thread(processor).start(); | |
} | |
} | |
/** | |
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse response) | |
*/ | |
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { | |
for(ScenarioProcessor processor : processors) { | |
processor.change(); | |
} | |
} | |
private class ScenarioProcessor implements Runnable { | |
private final String queueName; | |
private boolean run = true; | |
public ScenarioProcessor(String queueName) { | |
this.queueName = queueName; | |
} | |
public void change() { this.run = !this.run; } | |
public void run() { | |
while(this.run) { | |
try { | |
boolean result = this.checkMessage(); | |
try { | |
if(!result) Thread.sleep(10 * 1000); | |
} catch (InterruptedException e) { | |
} | |
} catch(Exception e) { | |
try { | |
Thread.sleep(10 * 1000); | |
} catch (InterruptedException ex) { | |
} | |
} | |
} | |
} | |
private boolean checkMessage() { | |
try { | |
InitialContext ic = new InitialContext(); | |
XAConnectionFactory cf; | |
UserTransaction ut; | |
DataSource ds; | |
try { | |
cf = (XAConnectionFactory)ic.lookup("XAConnectionFactory"); | |
ut = (UserTransaction)ic.lookup("UserTransaction"); | |
ds = (DataSource)ic.lookup("java:/LocktestDS"); | |
} catch(NameNotFoundException nnfe) { | |
return false; | |
} | |
ut.begin(); | |
XAConnection conn = null; | |
XASession sess = null; | |
java.sql.Connection sqlConn = null; | |
PreparedStatement stmt = null; | |
boolean result = true; | |
try { | |
conn = cf.createXAConnection(); | |
Queue queue = (Queue)ic.lookup(this.queueName); | |
conn.start(); | |
sess = conn.createXASession(); | |
MessageConsumer consumer = sess.createConsumer(queue); | |
TextMessage msg = (TextMessage) consumer.receive(10); | |
if(msg != null) { | |
System.out.println("Message value : " + msg.getText()); | |
sqlConn = ds.getConnection(); | |
stmt = sqlConn.prepareStatement("insert into locktest_data(value) values(?)"); | |
stmt.setString(1, msg.getText()); | |
stmt.executeUpdate(); | |
} else { | |
result = false; | |
} | |
} catch(Exception ex) { | |
ut.rollback(); | |
throw ex; | |
} finally { | |
if(sess != null) sess.close(); | |
if(conn != null) conn.close(); | |
if(stmt != null) stmt.close(); | |
if(sqlConn != null) sqlConn.close(); | |
} | |
ut.commit(); | |
return result; | |
} catch(Exception e) { | |
e.printStackTrace(); | |
return false; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment