Skip to content

Instantly share code, notes, and snippets.

@NikunjGithub
Last active December 28, 2015 20:38
Show Gist options
  • Save NikunjGithub/7558507 to your computer and use it in GitHub Desktop.
Save NikunjGithub/7558507 to your computer and use it in GitHub Desktop.
LibRabbitMQ consumer example
#include <iostream>
#include <string>
#include <sstream>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <amqp.h>
#include <amqp_framing.h>
#include "rabbitConfig.h"
#include "Exception.hpp"
//count to control number of messages cached by consumers
//if no caching is required then set to 1
//if unlimited caching is required then set to 0
//Count depends on the design of application, how producers and
//consumers are designed and what is the throughput desired.
int PREFETCH_COUNT = 1;
//count of conumsers
int CONSUMER_COUNT = 5;
std::string HOST = "<ipaddress>";
int PORT = 5672;
std::string VHOST = "/";
std::string USER = "guest";
std::string PASSWORD = "guest";
std::string QUEUENAME = "queue";
std::string EXCHANGE = "NewExchange";
std::string ROUTINGKEY = "routingKey";
static const int CHANNEL = 1; //Defines AMQP Channel Count
static const int MANDATORY = 0; //Defines Queue Parameter
static const int IMMEDIATE = 0;
static const int CHANNEL_MAX = 0; //Defines Max. no. of Channels
static const int FRAME_MAX = 131072; // Defines max frame size
static const int HEART_BEAT = 0; // Defines heart beat time
static const int NO_LOCAL = 0;
static const int NO_ACK = 0;
static const int EXCLUSIVE = 0;
//Data class
class Data{
public:
int _id;
// your class implementation
};
//connect to queue and start consuming messages
void connectAndConsume()
{
//new connection
amqp_connection_state_t
_conn = amqp_new_connection();
int _sockfd = amqp_open_socket(HOST.c_str(),PORT);
if (_sockfd < 0)
{
std::stringstream ss;
char* errstr = amqp_error_string(- _sockfd);
ss << "Cannot create socket: " << errstr;
free(errstr);
std::cout<<ss.str()<< __FILE__<< __LINE__<<std::endl;
}
amqp_set_sockfd(_conn,_sockfd);
//login
amqp_rpc_reply_t response = amqp_login(_conn
,VHOST.c_str()
,CHANNEL_MAX
,FRAME_MAX
,HEART_BEAT
,AMQP_SASL_METHOD_PLAIN
,USER.c_str()
,PASSWORD.c_str());
if (response.reply_type != AMQP_RESPONSE_NORMAL)
{
std::cout<<"Error while login."<</*response<<*/__FILE__<<":"<<__LINE__<<std::endl;
exit(1);
}
amqp_channel_open(_conn,1);
amqp_basic_qos_ok_t* qos = amqp_basic_qos(_conn, CHANNEL, 0, PREFETCH_COUNT ,false );
//Declare queue
amqp_queue_declare_ok_t *r
= amqp_queue_declare(_conn,
1,
amqp_cstring_bytes(QUEUENAME.c_str()),
0,
1,
0,
0,
amqp_empty_table);
amqp_rpc_reply_t ret = amqp_get_rpc_reply(_conn);
if (ret.reply_type != AMQP_RESPONSE_NORMAL)
{
std::cout<<"Error declaring queue."<</*ret<<*/__FILE__<<":"<<__LINE__<<std::endl;
}
if (r->queue.bytes == NULL) {
std::cout<<"Out of memory while copying queue name."<<__FILE__<<__LINE__<<std::endl;
}
//bind Queue for executing amqp commands
amqp_queue_bind(_conn,
1,
amqp_cstring_bytes(QUEUENAME.c_str()),
amqp_cstring_bytes(EXCHANGE.c_str()),
amqp_cstring_bytes(ROUTINGKEY.c_str()),
amqp_empty_table);
//register for consume
amqp_basic_consume(_conn,
CHANNEL,
amqp_cstring_bytes(QUEUENAME.c_str()),
amqp_empty_bytes,
NO_LOCAL,
NO_ACK,
EXCLUSIVE,
amqp_empty_table);
amqp_rpc_reply_t ret1 = amqp_get_rpc_reply(_conn);
if (ret1.reply_type != AMQP_RESPONSE_NORMAL)
{
std::cout<<"Unable to send consume command"<</*ret1<<*/ __FILE__<< __LINE__<<std::endl;
}
int result;
//Consume Messages
while(true)
{
amqp_frame_t frame;
amqp_maybe_release_buffers(_conn);
result = amqp_simple_wait_frame(_conn, &frame);
if (result < 0)
{
std::cout<<"Error in header frame"<<__FILE__<<__LINE__<<std::endl;
}
if (frame.frame_type != AMQP_FRAME_METHOD)
{
continue;
}
amqp_basic_deliver_t *d
= (amqp_basic_deliver_t *) frame.payload.method.decoded;
if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
{
continue;
}
result = amqp_simple_wait_frame(_conn, &frame);
if (result < 0)
{
std::cout<<"Message frame is invalid!"<< __FILE__<< __LINE__<<std::endl;
}
if (frame.frame_type != AMQP_FRAME_HEADER)
{
std::cout<<"Expected header!"<<__FILE__<< __LINE__<<std::endl;
}
amqp_basic_properties_t *p
= (amqp_basic_properties_t *) frame.payload.properties.decoded;
size_t body_size = frame.payload.properties.body_size;
size_t body_received = 0;
amqp_bytes_t body = amqp_bytes_malloc(body_size);
while (body_received < body_size)
{
result = amqp_simple_wait_frame(_conn, &frame);
if (result < 0) {
std::cerr<< "Wait Frame less than Zero.."<< std::endl;
}
if (frame.frame_type != AMQP_FRAME_BODY) {
std::cout<<"Expected body frame!"<< __FILE__<<__LINE__<<std::endl;
}
void* body_ptr = reinterpret_cast<char*>(body.bytes) + body_received;
memcpy(body_ptr, frame.payload.body_fragment.bytes
, frame.payload.body_fragment.len);
body_received += frame.payload.body_fragment.len;
}//while ends
if(body_received != body_size)
{
std::cerr << "Received Body is less than Body Size.." << std::endl;
}
amqp_basic_ack(_conn, 1,d->delivery_tag,0);
boost::shared_ptr<Data> _data = boost::make_shared<Data>();
memcpy( _data.get(), static_cast<char*>(body.bytes), body.len);
amqp_bytes_free(body);
std::cout<<"Data id : "<<_data->_id<<std::endl;
}
}
int main()
{
std::vector<boost::thread*> consumerList;
for(int x = 0 ; x < CONSUMER_COUNT; ++x)
{
consumerList.push_back(new boost::thread(
connectAndConsume));
}
for(std::vector<boost::thread*>::iterator itr = consumerList.begin();
itr != consumerList.end(); ++itr)
{
(*itr)->join();
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment