Skip to content

Instantly share code, notes, and snippets.

@lawrencewalters
Last active May 4, 2022 16:53
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lawrencewalters/733da05519b9f249ca23 to your computer and use it in GitHub Desktop.
Save lawrencewalters/733da05519b9f249ca23 to your computer and use it in GitHub Desktop.
Pentaho - RabbitMQ integration

Pentaho - RabbitMQ on Windows Proof of Concept

RabbitMQ install

Went with the manual install (kept getting an error on erlang not being available;

Erlang could not be detected. You must install Erlang before installing RabbitMQ. Would you like the installer to open a browser window to the Erlang download site?

This was with Erlang 64bit installed, bin dir added to the path. Also tried with ERLANG_HOME set as well. No dice. So, got the RabbitMQ manual install (just a zip, really), and the commands from the sbin dir work great.

https://www.rabbitmq.com/install-windows-manual.html

Start it up

rabbitmq-server.bat

Management console

https://www.rabbitmq.com/management.html

rabbitmq-plugins enable rabbitmq_management

The web UI is located at: http://localhost:15672/ The HTTP API and its documentation are both located at: http://server-name:15672/api/ (or view our latest HTTP API documentation here). Download rabbitmqadmin at: http://server-name:15672/cli/

Pentaho Setup

Download pentaho 5.2 data integration AKA kettl. (oh yeah, I'm on windows... that's the windows link). This is the latest at the time of this writing... not sure if this is wildly different than the open source version, or what. but that's what I went with.

Got that in c:\bin\pentaho.

Enqueueing something in RabbitMQ with Bunny

I used Ruby to enqueue something. Specifically bunny gem and it's most basic sample to get something on the queue. More specifically, the Hello World example. The ruby code there has a publisher and a subscriber, but I left off the subscriber part, because that's what the Pentaho is going to do, duh.

Dequeueing something from RabbitMQ in Pentaho with this sweet PDI plugin

I found this AMQP client for pentaho data integration / kettle here: PDI-Plugin-Step-AMQP. I tried building it, but that failed on maven dependency management trying to get 4.1.4-SNAPSHOT stuff from the pentaho maven repo. But, whoever created this, also put pre-built packages together, and you can grab them from pdi-marketplace-packages, specifically ic-amqp-plugin-pdi-1.1.2-SNAPSHOT. I extracted that into my Pentaho, so I ended up with this directory:

C:\bin\Pentaho\server\data-integration-server\pentaho-solutions\system\kettle\plugins\steps\ic-amqp-plugin

This also needs to go in the Spoon (aka Data Integration) plugin directory, according to this helpful but hard to find doc concept_deploying_step_plugins

C:\bin\Pentaho\design-tools\data-integration\plugins\steps\ic-amqp-plugin

Start the service (Data Integration in services control panel)

Creating the Transformation in Spoon/Data Integration

Ok, so this is where we really de-queue something... or consume it or whatevs....

Now run Data Integration (Start -> All Programs -> Pentaho Enterprise Edition -> Design Tools -> Data Integration)

Create a new Transformation From the Design tab on the left, filter on "AMQP", which should show our sweet transformation plugin. Drag that onto the canvas Double click, fill in the details:

  • mode: consumer
  • uri: amqp://guest:guest@localhost:5672
  • body field: message
  • exchange, routing key, both are: bunny.examples.hello_world Add Text file output Double click, fill in details:
  • figure out a path to store it
  • on content tab, click "Get Fields"... this should fill in "message" and "bunny.examples.hello_world" Connect the IC AMQP to the Text file output Save the transformation Run it

Check the RabbitMQ queue - you should see the message count go down!

Oh, and this assumes you had run the bunny example once or twice to get a message on the queue.

Boom!

here's my ruby code [https://github.com/lawrencewalters/rabbitmq_pub]

and the output of my transformation:

message;my_string;my_int
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1
"{""my_string"": ""heyo"", ""my_int"":""1""}";heyo; 1

you can see I kept the original message, and then the two parsed fields.

@guilhermeegg
Copy link

Hi,
How are you?

I'm try conumer amqp with pentaho.
I'm getting the error below, maybe you can help me.

2021/12/09 19:31:51 - IC AMQP Plugin.0 - ERROR (version 8.2.0.0-342, build 8.2.0.0-342 from 2018-11-14 10.30.55 by buildguy) : Erro inesperado
2021/12/09 19:31:51 - IC AMQP Plugin.0 - ERROR (version 8.2.0.0-342, build 8.2.0.0-342 from 2018-11-14 10.30.55 by buildguy) : java.lang.NullPointerException
2021/12/09 19:31:51 - IC AMQP Plugin.0 - at com.instaclick.pentaho.plugin.amqp.AMQPPlugin.consume(AMQPPlugin.java:179)
2021/12/09 19:31:51 - IC AMQP Plugin.0 - at com.instaclick.pentaho.plugin.amqp.AMQPPlugin.processRow(AMQPPlugin.java:81)
2021/12/09 19:31:51 - IC AMQP Plugin.0 - at org.pentaho.di.trans.step.RunThread.run(RunThread.java:62)
2021/12/09 19:31:51 - IC AMQP Plugin.0 - at java.lang.Thread.run(Unknown Source)

Pentaho version 9.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment