Skip to content

Instantly share code, notes, and snippets.

@berlinbrown
Created July 4, 2021 20:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save berlinbrown/09cc15b4c4904bfbe391374fb663c606 to your computer and use it in GitHub Desktop.
Save berlinbrown/09cc15b4c4904bfbe391374fb663c606 to your computer and use it in GitHub Desktop.
Basic IBM MQ with threading
package pipeline.mockdump;
import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import java.util.Random;
/**
* Program Name
* MQTest11B
* <p>
* Description
* This java class will connect to a remote queue manager with the
* MQ setting stored in a HashTable, put 2 message on a queue with unique CorrelIds
* and then retrieve the message with a CorrelId of "0002".
* <p>
* Sample Command Line Parameters
* -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
*
* @author Roger Lacroix
*/
public class SimpleAppMQAgain {
private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
private Hashtable<String, String> params;
private Hashtable<String, Object> mqht;
private String qMgrName;
private String outputQName;
private boolean running = true;
/**
* The constructor
*/
public SimpleAppMQAgain() {
super();
params = new Hashtable<String, String>();
mqht = new Hashtable<String, Object>();
}
/**
* Make sure the required parameters are present.
*
* @return true/false
*/
private boolean allParamsPresent() {
boolean b = params.containsKey("-h") && params.containsKey("-p") &&
params.containsKey("-c") && params.containsKey("-m") &&
params.containsKey("-q") &&
params.containsKey("-u") && params.containsKey("-x");
if (b) {
try {
Integer.parseInt((String) params.get("-p"));
} catch (NumberFormatException e) {
b = false;
}
}
return b;
}
/**
* Extract the command-line parameters and initialize the MQ HashTable.
*
* @param args
* @throws IllegalArgumentException
*/
private void init(String[] args) throws IllegalArgumentException {
int port = 1414;
if (args.length > 0 && (args.length % 2) == 0) {
for (int i = 0; i < args.length; i += 2) {
params.put(args[i], args[i + 1]);
}
} else {
throw new IllegalArgumentException();
}
if (allParamsPresent()) {
qMgrName = (String) params.get("-m");
outputQName = (String) params.get("-q");
try {
port = Integer.parseInt((String) params.get("-p"));
} catch (NumberFormatException e) {
port = 1414;
}
mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
// I don't want to see MQ exceptions at the console.
MQException.log = null;
} else {
throw new IllegalArgumentException();
}
}
/**
* Connect, open queue, write a message, close queue and disconnect.
*/
private void testSendAndReceive() {
MQQueueManager qMgr = null;
MQQueue queue = null;
int openOptions = CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING
| CMQC.MQOO_INQUIRE | CMQC.MQOO_BROWSE;
final MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = CMQC.MQPMO_NO_SYNCPOINT | CMQC.MQPMO_FAIL_IF_QUIESCING | CMQC.MQPMO_ASYNC_RESPONSE;
MQMessage sendmsg;
String msgData;
DecimalFormat df = new DecimalFormat("0000");
try {
qMgr = new MQQueueManager(qMgrName, mqht);
logger("successfully connected to " + qMgrName);
queue = qMgr.accessQueue(outputQName, openOptions);
logger("successfully opened " + outputQName);
this.listenMessages(queue);
Thread.sleep(8000);
System.out.println("... ~~~ sending messages ________");
for (int i = 0; i < 4; i++) {
// Define a simple MQ message, and write some text
sendmsg = new MQMessage();
sendmsg.format = CMQC.MQFMT_STRING;
sendmsg.messageId = CMQC.MQMI_NONE;
sendmsg.correlationId = df.format(2).getBytes();
// Write message data
msgData = "This is a test message from MQTest11c. CorrelID is " + new String(sendmsg.correlationId + " ; " + new Date());
sendmsg.writeString(msgData);
// put the message on the queue
System.out.println(" >>>> entering putting message");
queue.put(sendmsg, pmo);
System.out.println(" <--- exiting putting message");
logger("Sent: Message Data>>>" + msgData);
}
//this.simpleGet(queue);
/*
* Code to send 2 messages with a specific CorrelId. i.e. 0001 and 0002
*/
for (int i = 0; i < 1; i++) {
// Define a simple MQ message, and write some text
sendmsg = new MQMessage();
sendmsg.format = CMQC.MQFMT_STRING;
sendmsg.messageId = CMQC.MQMI_NONE;
//sendmsg.correlationId = df.format(i + 1).getBytes();
sendmsg.correlationId = df.format(2).getBytes();
// Write message data
msgData = "(set2) This is a test message from MQTest11c. CorrelID is " + new String(sendmsg.correlationId + " ; " + new Date());
sendmsg.writeString(msgData);
// put the message on the queue
System.out.println(" ~~===> Entering put");
queue.put(sendmsg, pmo);
System.out.println(" <<<===~~ Leaving put");
logger("Sent: Message Data>>>" + msgData);
}
Thread.sleep(1000);
running = false;
System.out.println("End of testSendAndReceive ");
} catch (MQException e) {
logger("CC=" + e.completionCode + " : RC=" + e.reasonCode);
} catch (IOException e) {
logger("IOException:" + e.getLocalizedMessage());
} catch (Exception e) {
logger("InterruptedException:" + e.getLocalizedMessage());
} finally {
try {
if (queue != null) {
// Possibly hanging here (never left the "get, locked" ????
queue.close();
logger("closed: " + outputQName);
}
} catch (MQException e) {
logger("CC=" + e.completionCode + " : RC=" + e.reasonCode);
}
try {
if (qMgr != null) {
qMgr.disconnect();
logger("disconnected from " + qMgrName);
}
} catch (MQException e) {
logger("CC=" + e.completionCode + " : RC=" + e.reasonCode);
}
}
}
@Deprecated
public void simpleGet(MQQueue queue) {
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING;
gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID;
gmo.waitInterval = CMQC.MQWI_UNLIMITED;
logger("``...``");
logger("Waiting for messages .... ");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
// Define a simple MQ message, and write some text
MQMessage receiveMsg = new MQMessage();
receiveMsg.messageId = CMQC.MQMI_NONE;
receiveMsg.correlationId = "0002".getBytes();
try {
// get the message on the queue
queue.get(receiveMsg, gmo);
if (CMQC.MQFMT_STRING.equals(receiveMsg.format)) {
String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength());
logger("<<<< Received (done waiting on get): Message Data <<<< " + msgStr);
} else {
byte[] b = new byte[receiveMsg.getMessageLength()];
receiveMsg.readFully(b);
logger("Received: Message Data>>>" + new String(b));
}
} catch (Exception e) {
logger("Error at get message: " + e.getMessage());
}
}
public void listenMessages(MQQueue queue) {
/*
* Code to receive a message with a specific CorrelId. i.e. 0002
*/
Runnable r = () -> {
while (running) {
//logger("``...``");
//logger("Waiting for messages .... ");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
final MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING;
gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID;
//gmo.waitInterval = CMQC.MQWI_UNLIMITED;
gmo.waitInterval = 1000;
// Define a simple MQ message, and write some text
MQMessage receiveMsg = new MQMessage();
receiveMsg.messageId = CMQC.MQMI_NONE;
receiveMsg.correlationId = "0002".getBytes();
try {
final int currentDepth = queue.getCurrentDepth();
System.out.println("getCurrentDepth: " + currentDepth);
if (currentDepth > 0) {
// get the message on the queue
System.out.println(" ---> Entering get");
queue.get(receiveMsg, gmo);
System.out.println(" <<<-- Exiting get");
if (CMQC.MQFMT_STRING.equals(receiveMsg.format)) {
String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength());
logger("<<<< Received (done waiting on get): Message Data <<<< " + msgStr);
} else {
byte[] b = new byte[receiveMsg.getMessageLength()];
receiveMsg.readFully(b);
logger("Received: Message Data>>>" + new String(b));
}
}
} catch (Exception e) {
logger("Error at get message: " + e.getMessage());
}
}
};
new Thread(r).start();
System.out.println("Exiting listen messages");
}
/**
* A simple logger method
*
* @param data
*/
public static void logger(String data) {
String className = Thread.currentThread().getStackTrace()[2].getClassName();
// Remove the package info.
if ((className != null) && (className.lastIndexOf('.') != -1))
className = className.substring(className.lastIndexOf('.') + 1);
System.out.println(LOGGER_TIMESTAMP.format(new Date()) + " " + className + ": " + Thread.currentThread().getStackTrace()[2].getMethodName() + ": " + data);
}
/**
* main line
*
* @param args
*/
public static void main(String[] args) {
SimpleAppMQAgain write = new SimpleAppMQAgain();
try {
final String[] myargs = {
"-m",
"QM1",
"-h",
"127.0.0.1",
"-p",
"1414",
"-c",
"DEV.ADMIN.SVRCONN",
"-q",
"DEV.QUEUE.1",
"-u",
"admin",
"-x",
"passw0rd"
};
write.init(myargs);
write.testSendAndReceive();
} catch (IllegalArgumentException e) {
logger("Usage: java MQTest11B -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
System.exit(1);
}
System.out.println("Running exit ....");
System.exit(0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment