Created
December 9, 2013 18:37
-
-
Save giovannibonetti/7878008 to your computer and use it in GitHub Desktop.
Here I have some code I used in a Pentaho Kettle (a.k.a. PDI) transformation for communicating with RabbitMQ.
Output step => Pentaho-Kettle-RabbitMQ-Producer.java
Input step => Pentaho-Kettle-RabbitMQ-Consumer.java
(non-blocking). How to use it:
1) Prepare your Kettle transformation;
2) Download RabbitMQ's Java client (only the main Java file is…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.Channel; | |
import com.rabbitmq.client.ConnectionFactory; | |
//import com.rabbitmq.client.QueueingConsumer; | |
import com.rabbitmq.client.GetResponse; | |
import com.rabbitmq.client.AMQP; | |
//import com.rabbitmq.client.*; | |
//import com.rabbitmq.client.MessageProperties; | |
//import com.rabbitmq.client.AlreadyClosedException; | |
import com.rabbitmq.client.ConsumerCancelledException; | |
import com.rabbitmq.client.ShutdownSignalException; | |
//import java.io.FileNotFoundException; | |
import java.io.IOException; | |
//private FieldHelper inputField = null; | |
private FieldHelper outputField = null; | |
Connection connection = null; | |
Channel channel = null; | |
//ConnectionFactory factory = null; | |
String encoding = "UTF-8"; | |
String exchangeName = null; | |
String routingKey = null; | |
String queueName = null; | |
boolean autoAck = false; //false if the server should expect explicit acknowledgements | |
//QueueingConsumer consumer = null; | |
//QueueingConsumer.Delivery delivery; | |
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException | |
{ | |
// Let's look up parameters only once for performance reason. | |
if (first) { | |
//inputField = get(Fields.In, getParameter("INPUT_FIELD")); | |
outputField = get(Fields.Out, getParameter("OUTPUT_FIELD")); | |
first = false; | |
} | |
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large | |
// enough to handle any new fields you are creating in this step. | |
//Object[] outputRow = createOutputRow(r, data.outputRowMeta.size()); | |
//Object[] r = RowDataUtil.allocateRowData(data.outputRowMeta.size()); | |
Object[] outputRow = createOutputRow(new Object[0], data.outputRowMeta.size()); | |
try { | |
//RabbitMQ - consume message | |
//delivery = consumer.nextDelivery(); | |
// @throws InterruptedException if an interrupt is received while waiting | |
// @throws ShutdownSignalException if the connection is shut down while waiting | |
// @throws ConsumerCancelledException if this consumer is cancelled while waiting | |
//String message = new String(delivery.getBody()); | |
GetResponse response = channel.basicGet(queueName, autoAck); | |
// @throws java.io.IOException if an error is encountered | |
if (response == null) { | |
// No message retrieved => stop transformation | |
setOutputDone(); | |
return false; | |
} else { | |
AMQP.BasicProperties props = response.getProps(); | |
//byte[] body = response.getBody(); | |
String messageBody = new String (response.getBody(), encoding); | |
long deliveryTag = response.getEnvelope().getDeliveryTag(); | |
//Everything ok (no error) -> prepare output row | |
//get(Fields.Out, "message").setValue(outputRow, messageBody); | |
//outputRow[0] = messageBody; | |
outputField.setValue(outputRow, messageBody); | |
putRow(data.outputRowMeta, outputRow); | |
//RabbitMQ - acknowledge message was processed correctly | |
// @throws java.io.IOException if an error is encountered | |
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); | |
channel.basicAck(deliveryTag, false); // acknowledge receipt of the message | |
return true; | |
} | |
//} catch (InterruptedException | ShutdownSignalException | ConsumerCancelledException ex) { | |
// Multiple exception catching works in Java 7+, but not in PDI 4.4 | |
//} catch (InterruptedException ex) { | |
//putError(data.outputRowMeta, outputRow, 1, ex.toString(), "","" ); | |
//return true; | |
//} catch (ShutdownSignalException ex) { | |
//putError(data.outputRowMeta, outputRow, 1, ex.toString(), "","" ); | |
//return true; | |
//} catch (ConsumerCancelledException ex) { | |
//putError(data.outputRowMeta, outputRow, 1, ex.toString(), "","" ); | |
//return true; | |
} catch (IOException ex) { | |
putError(data.outputRowMeta, outputRow, 1, ex.toString(), "","" ); | |
return true; | |
} | |
} | |
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) | |
{ | |
//if (super.init(stepMetaInterface, stepDataInterface)) { | |
if (parent.initImpl(stepMetaInterface, stepDataInterface)){ | |
try { | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setUsername(getParameter("AMQP_USERNAME")); | |
factory.setPassword(getParameter("AMQP_PASSWORD")); | |
factory.setVirtualHost("/"); | |
factory.setHost(getParameter("AMQP_HOST")); | |
factory.setPort(5672); | |
connection = factory.newConnection(); | |
channel = connection.createChannel(); | |
boolean durable = true; //=survive server restart | |
//channel.exchangeDeclare(exchangeName, "topic"); | |
exchangeName = getParameter("EXCHANGE_NAME"); | |
channel.exchangeDeclare(exchangeName, "topic", durable); | |
routingKey = getParameter("ROUTING_KEY"); | |
queueName = getParameter("QUEUE_NAME"); | |
boolean exclusive = false; // Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes | |
boolean autoDelete = false; //If true, the exchange is deleted when all queues have finished using it. | |
channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); | |
//String queueName = channel.queueDeclare().getQueue(); //a non-durable, exclusive, autodelete queue with a generated name | |
channel.queueBind(queueName, exchangeName, routingKey); | |
// Blocking Consumer | |
//consumer = new QueueingConsumer(channel); | |
//channel.basicConsume(queueName, autoAck, consumer); | |
return true; | |
} catch (InterruptedException ex) { | |
logError("InterruptedException: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} catch (NumberFormatException ex) { | |
logError("NumberFormatException: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} catch (ShutdownSignalException ex) { | |
logError("ShutdownSignalException: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} catch (ConsumerCancelledException ex) { | |
logError("ConsumerCancelledException: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} catch (NullPointerException ex) { | |
logError("NullPointerException: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} catch (Exception ex) { | |
logError("Exception: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} | |
} | |
return false; | |
} | |
public void dispose(StepMetaInterface smi, StepDataInterface sdi) | |
{ | |
if ((channel != null) || (connection != null)) { | |
try{ | |
channel.close(); | |
connection.close(); | |
} catch (IOException ex) { | |
logError("IOException: ", ex); | |
} | |
} | |
parent.disposeImpl(smi, sdi); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.Channel; | |
import com.rabbitmq.client.ConnectionFactory; | |
//import com.rabbitmq.client.*; | |
import com.rabbitmq.client.MessageProperties; | |
import com.rabbitmq.client.AlreadyClosedException; | |
import com.rabbitmq.client.ConsumerCancelledException; | |
import com.rabbitmq.client.ShutdownSignalException; | |
//import java.io.FileNotFoundException; | |
import java.io.IOException; | |
private FieldHelper inputField = null; | |
private FieldHelper outputField = null; | |
//Integer firstnameIndex = null; | |
//Integer nameIndex = null; | |
//ConnectionFactory factory = null; | |
String encoding = "UTF-8"; | |
String exchangeName = null; | |
String routingKey = null; | |
Connection connection = null; | |
Channel channel = null; | |
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException | |
{ | |
// First, get a row from the default input hop | |
Object[] r = getRow(); | |
// If the row object is null, we are done processing. | |
if (r == null) { | |
setOutputDone(); | |
return false; | |
} | |
// Let's look up parameters only once for performance reason. | |
if (first) { | |
inputField = get(Fields.In, getParameter("INPUT_FIELD")); | |
// firstnameIndex = getInputRowMeta().indexOfValue(firstnameField); | |
// lastnameIndex = getInputRowMeta().indexOfValue(lastnameField); | |
// if ((firstnameIndex<0) || (lastnameIndex<0)) { | |
// throw new KettleException("(User Defined Javascript) field not found in the input row\!"); | |
// } | |
first=false; | |
} | |
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large | |
// enough to handle any new fields you are creating in this step. | |
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size()); | |
try{ | |
//RabbitMQ - send message | |
// @throws java.io.IOException if an error is encountered | |
String messageBody = inputField.getString(r); | |
channel.basicPublish(exchangeName, routingKey, null, messageBody.getBytes(encoding)); | |
//byte[] messageBodyBytes = "Hello, world!".getBytes(); | |
//channel.basicPublish(exchangeName, routingKey | |
// ,MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes) ; | |
//Everything ok (no error) -> prepare output row | |
//String output_value = messageBody; | |
//outputField.setValue(outputRow, output_value); | |
//putRow(data.outputRowMeta, outputRow); | |
return true; | |
} catch (IOException ex) { | |
//logError("IOException: ", ex); | |
putError(data.outputRowMeta, outputRow, 1, ex.toString(), "","" ); | |
return true; | |
} | |
} | |
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) | |
{ | |
//if (super.init(stepMetaInterface, stepDataInterface)) { | |
if (parent.initImpl(stepMetaInterface, stepDataInterface)){ | |
try { | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setUsername(getParameter("AMQP_USERNAME")); | |
factory.setPassword(getParameter("AMQP_PASSWORD")); | |
factory.setVirtualHost("/"); | |
factory.setHost(getParameter("AMQP_HOST")); | |
factory.setPort(5672); | |
connection = factory.newConnection(); | |
channel = connection.createChannel(); | |
boolean durable = true; //=survive server restart | |
String exchangeName = getParameter("EXCHANGE_NAME"); | |
//channel.exchangeDeclare(exchangeName, "topic"); | |
channel.exchangeDeclare(exchangeName, "topic", durable); | |
String routingKey = getParameter("ROUTING_KEY"); | |
return true; | |
} catch (InterruptedException ex) { | |
logError("InterruptedException: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} catch (NumberFormatException ex) { | |
logError("NumberFormatException: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} catch (ShutdownSignalException ex) { | |
logError("ShutdownSignalException: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} catch (ConsumerCancelledException ex) { | |
logError("ConsumerCancelledException: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} catch (NullPointerException ex) { | |
logError("NullPointerException: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} catch (Exception ex) { | |
logError("Exception: ", ex); | |
//putError(data.outputRowMeta, dataRows, 1, ex.toString(), "","" ); | |
return false; | |
} | |
} | |
return false; | |
} | |
public void dispose(StepMetaInterface smi, StepDataInterface sdi) | |
{ | |
if ((channel != null) || (connection != null)) { | |
try{ | |
channel.close(); | |
connection.close(); | |
} catch (IOException ex) { | |
logError("IOException: ", ex); | |
} | |
} | |
parent.disposeImpl(smi, sdi); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment