Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save giovannibonetti/7878008 to your computer and use it in GitHub Desktop.
Save giovannibonetti/7878008 to your computer and use it in GitHub Desktop.
Here I have some code I used in a Pentaho Kettle (a.k.a. PDI) transformation for communicating with RabbitMQ. Output step => Pentaho-Kettle-RabbitMQ-Producer.java Input step => Pentaho-Kettle-RabbitMQ-Consumer.java (non-blocking). How to use it: 1) Prepare your Kettle transformation; 2) Download RabbitMQ's Java client (only the main Java file is…
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
//import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.AMQP;
//import com.rabbitmq.client.*;
//import com.rabbitmq.client.MessageProperties;
//import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.ShutdownSignalException;
//import java.io.FileNotFoundException;
import java.io.IOException;
//private FieldHelper inputField = null;
private FieldHelper outputField = null;
Connection connection = null;
Channel channel = null;
//ConnectionFactory factory = null;
String encoding = "UTF-8";
String exchangeName = null;
String routingKey = null;
String queueName = null;
boolean autoAck = false; //false if the server should expect explicit acknowledgements
//QueueingConsumer consumer = null;
//QueueingConsumer.Delivery delivery;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
// Let's look up parameters only once for performance reason.
if (first) {
//inputField = get(Fields.In, getParameter("INPUT_FIELD"));
outputField = get(Fields.Out, getParameter("OUTPUT_FIELD"));
first = false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
//Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());
//Object[] r = RowDataUtil.allocateRowData(data.outputRowMeta.size());
Object[] outputRow = createOutputRow(new Object[0], data.outputRowMeta.size());
try {
//RabbitMQ - consume message
//delivery = consumer.nextDelivery();
// @throws InterruptedException if an interrupt is received while waiting
// @throws ShutdownSignalException if the connection is shut down while waiting
// @throws ConsumerCancelledException if this consumer is cancelled while waiting
//String message = new String(delivery.getBody());
GetResponse response = channel.basicGet(queueName, autoAck);
// @throws java.io.IOException if an error is encountered
if (response == null) {
// No message retrieved => stop transformation
setOutputDone();
return false;
} else {
AMQP.BasicProperties props = response.getProps();
//byte[] body = response.getBody();
String messageBody = new String (response.getBody(), encoding);
long deliveryTag = response.getEnvelope().getDeliveryTag();
//Everything ok (no error) -> prepare output row
//get(Fields.Out, "message").setValue(outputRow, messageBody);
//outputRow[0] = messageBody;
outputField.setValue(outputRow, messageBody);
putRow(data.outputRowMeta, outputRow);
//RabbitMQ - acknowledge message was processed correctly
// @throws java.io.IOException if an error is encountered
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
channel.basicAck(deliveryTag, false); // acknowledge receipt of the message
return true;
}
//} catch (InterruptedException | ShutdownSignalException | ConsumerCancelledException ex) {
// Multiple exception catching works in Java 7+, but not in PDI 4.4
//} catch (InterruptedException ex) {
//putError(data.outputRowMeta, outputRow, 1, ex.toString(), "","" );
//return true;
//} catch (ShutdownSignalException ex) {
//putError(data.outputRowMeta, outputRow, 1, ex.toString(), "","" );
//return true;
//} catch (ConsumerCancelledException ex) {
//putError(data.outputRowMeta, outputRow, 1, ex.toString(), "","" );
//return true;
} catch (IOException ex) {
putError(data.outputRowMeta, outputRow, 1, ex.toString(), "","" );
return true;
}
}
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)
{
//if (super.init(stepMetaInterface, stepDataInterface)) {
if (parent.initImpl(stepMetaInterface, stepDataInterface)){
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(getParameter("AMQP_USERNAME"));
factory.setPassword(getParameter("AMQP_PASSWORD"));
factory.setVirtualHost("/");
factory.setHost(getParameter("AMQP_HOST"));
factory.setPort(5672);
connection = factory.newConnection();
channel = connection.createChannel();
boolean durable = true; //=survive server restart
//channel.exchangeDeclare(exchangeName, "topic");
exchangeName = getParameter("EXCHANGE_NAME");
channel.exchangeDeclare(exchangeName, "topic", durable);
routingKey = getParameter("ROUTING_KEY");
queueName = getParameter("QUEUE_NAME");
boolean exclusive = false; // Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes
boolean autoDelete = false; //If true, the exchange is deleted when all queues have finished using it.
channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);
//String queueName = channel.queueDeclare().getQueue(); //a non-durable, exclusive, autodelete queue with a generated name
channel.queueBind(queueName, exchangeName, routingKey);
// Blocking Consumer
//consumer = new QueueingConsumer(channel);
//channel.basicConsume(queueName, autoAck, consumer);
return true;
} catch (InterruptedException ex) {
logError("InterruptedException: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
} catch (NumberFormatException ex) {
logError("NumberFormatException: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
} catch (ShutdownSignalException ex) {
logError("ShutdownSignalException: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
} catch (ConsumerCancelledException ex) {
logError("ConsumerCancelledException: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
} catch (NullPointerException ex) {
logError("NullPointerException: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
} catch (Exception ex) {
logError("Exception: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
}
}
return false;
}
public void dispose(StepMetaInterface smi, StepDataInterface sdi)
{
if ((channel != null) || (connection != null)) {
try{
channel.close();
connection.close();
} catch (IOException ex) {
logError("IOException: ", ex);
}
}
parent.disposeImpl(smi, sdi);
}
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
//import com.rabbitmq.client.*;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.ShutdownSignalException;
//import java.io.FileNotFoundException;
import java.io.IOException;
private FieldHelper inputField = null;
private FieldHelper outputField = null;
//Integer firstnameIndex = null;
//Integer nameIndex = null;
//ConnectionFactory factory = null;
String encoding = "UTF-8";
String exchangeName = null;
String routingKey = null;
Connection connection = null;
Channel channel = null;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
// First, get a row from the default input hop
Object[] r = getRow();
// If the row object is null, we are done processing.
if (r == null) {
setOutputDone();
return false;
}
// Let's look up parameters only once for performance reason.
if (first) {
inputField = get(Fields.In, getParameter("INPUT_FIELD"));
// firstnameIndex = getInputRowMeta().indexOfValue(firstnameField);
// lastnameIndex = getInputRowMeta().indexOfValue(lastnameField);
// if ((firstnameIndex<0) || (lastnameIndex<0)) {
// throw new KettleException("(User Defined Javascript) field not found in the input row\!");
// }
first=false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());
try{
//RabbitMQ - send message
// @throws java.io.IOException if an error is encountered
String messageBody = inputField.getString(r);
channel.basicPublish(exchangeName, routingKey, null, messageBody.getBytes(encoding));
//byte[] messageBodyBytes = "Hello, world!".getBytes();
//channel.basicPublish(exchangeName, routingKey
// ,MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes) ;
//Everything ok (no error) -> prepare output row
//String output_value = messageBody;
//outputField.setValue(outputRow, output_value);
//putRow(data.outputRowMeta, outputRow);
return true;
} catch (IOException ex) {
//logError("IOException: ", ex);
putError(data.outputRowMeta, outputRow, 1, ex.toString(), "","" );
return true;
}
}
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)
{
//if (super.init(stepMetaInterface, stepDataInterface)) {
if (parent.initImpl(stepMetaInterface, stepDataInterface)){
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(getParameter("AMQP_USERNAME"));
factory.setPassword(getParameter("AMQP_PASSWORD"));
factory.setVirtualHost("/");
factory.setHost(getParameter("AMQP_HOST"));
factory.setPort(5672);
connection = factory.newConnection();
channel = connection.createChannel();
boolean durable = true; //=survive server restart
String exchangeName = getParameter("EXCHANGE_NAME");
//channel.exchangeDeclare(exchangeName, "topic");
channel.exchangeDeclare(exchangeName, "topic", durable);
String routingKey = getParameter("ROUTING_KEY");
return true;
} catch (InterruptedException ex) {
logError("InterruptedException: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
} catch (NumberFormatException ex) {
logError("NumberFormatException: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
} catch (ShutdownSignalException ex) {
logError("ShutdownSignalException: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
} catch (ConsumerCancelledException ex) {
logError("ConsumerCancelledException: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
} catch (NullPointerException ex) {
logError("NullPointerException: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
} catch (Exception ex) {
logError("Exception: ", ex);
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" );
return false;
}
}
return false;
}
public void dispose(StepMetaInterface smi, StepDataInterface sdi)
{
if ((channel != null) || (connection != null)) {
try{
channel.close();
connection.close();
} catch (IOException ex) {
logError("IOException: ", ex);
}
}
parent.disposeImpl(smi, sdi);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment