Skip to content

Instantly share code, notes, and snippets.

@mbroadst
Created April 1, 2016 14:28
Show Gist options
  • Save mbroadst/d7bfefb0589a3296ecca55f36af6b3f9 to your computer and use it in GitHub Desktop.
Save mbroadst/d7bfefb0589a3296ecca55f36af6b3f9 to your computer and use it in GitHub Desktop.
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
#include <jsoncpp/json/reader.h>
#include <vector>
#include <string>
#include <iostream>
#include <sstream>
using namespace qpid::messaging;
using namespace qpid::types;
std::vector<std::string> &split(const std::string &input, char delimiter,
std::vector<std::string> &result)
{
std::stringstream stream(input);
std::string item;
while (std::getline(stream, item, delimiter))
result.push_back(item);
return result;
}
std::vector<std::string> split(const std::string &input, char delimiter)
{
std::vector<std::string> result;
split(input, delimiter, result);
return result;
}
Json::Value convertVariantMapToJson(const Variant::Map &map);
Json::Value convertVariantListToJson(const Variant::List &list);
Json::Value convertVariantToJson(const Variant &variant)
{
switch (variant.getType()) {
case VAR_VOID:
return Json::Value();
case VAR_BOOL:
return variant.asBool();
case VAR_UINT8:
case VAR_UINT16:
case VAR_UINT32:
case VAR_INT8:
case VAR_INT16:
case VAR_INT32:
return variant.asInt32();
case VAR_UINT64:
case VAR_INT64:
case VAR_UUID:
case VAR_STRING:
return variant.asString();
case VAR_FLOAT:
case VAR_DOUBLE:
return variant.asDouble();
case VAR_MAP:
return convertVariantMapToJson(variant.asMap());
case VAR_LIST:
return convertVariantListToJson(variant.asList());
}
return Json::Value();
}
Json::Value convertVariantListToJson(const Variant::List &list)
{
Json::Value result;
Variant::List::const_iterator it;
for (it = list.cbegin(); it != list.cend(); ++it) {
result.append(convertVariantToJson((*it)));
}
return result;
}
Json::Value convertVariantMapToJson(const Variant::Map &map)
{
Json::Value result;
Variant::Map::const_iterator it;
for (it = map.cbegin(); it != map.cend(); ++it) {
std::string key = (*it).first;
Variant variant = (*it).second;
result[key] = convertVariantToJson(variant);
}
return result;
}
Json::Value extractJsonBody(const Message &message)
{
Json::Value result;
Variant content = message.getContentObject();
if (content.getType() == qpid::types::VAR_STRING) {
Json::Reader reader;
std::stringstream bs(message.getContent());
if (!reader.parse(bs, result, false)) {
std::cerr << "parse error: " << reader.getFormatedErrorMessages();
return Json::Value(message.getContent());
}
} else {
result = convertVariantToJson(content);
}
return result;
}
int main(int argc, char** argv)
{
if (argc < 3) {
std::cout << "usage: " << argv[0] << " <broker> <topics>" << std::endl;
return 0;
}
std::string broker = argv[1];
std::vector<std::string> topics = split(argv[2], ',');
try {
Connection connection(broker, "{ protocol: amqp1.0 }");
connection.open();
Session session = connection.createSession();
std::vector<Receiver> receivers;
std::vector<std::string>::const_iterator it;
for (it = topics.begin(); it != topics.end(); ++it) {
Receiver receiver = session.createReceiver((*it));
receiver.setCapacity(10);
receivers.push_back(receiver);
}
while (true) {
Message message = session.nextReceiver().fetch();
Json::Value test = extractJsonBody(message);
std::cout << test.toStyledString() << std::endl;
session.acknowledge();
}
connection.close();
return 0;
} catch(const std::exception& error) {
std::cerr << error.what() << std::endl;
return 1;
}
}
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <jsoncpp/json/reader.h>
#include <iostream>
#include <iterator>
#include <sstream>
using namespace qpid::messaging;
using namespace qpid::types;
Variant::List convertJsonArrayToVariant(const Json::Value &array);
Variant::Map convertJsonObjectToVariant(const Json::Value &object);
Variant convertJsonToVariant(const Json::Value &value) {
switch (value.type()) {
case Json::nullValue: return Variant();
case Json::intValue: return Variant(value.asInt());
case Json::uintValue: return Variant(value.asUInt());
case Json::realValue: return Variant(value.asDouble());
case Json::booleanValue: return Variant(value.asBool());
case Json::stringValue: {
Variant stringVariant(value.asString());
stringVariant.setEncoding("utf8");
return stringVariant;
}
case Json::arrayValue:
return convertJsonArrayToVariant(value);
case Json::objectValue:
return convertJsonObjectToVariant(value);
}
return Variant();
}
Variant::List convertJsonArrayToVariant(const Json::Value &array)
{
Variant::List result;
Json::Value::const_iterator it;
for (it = array.begin(); it != array.end(); ++it) {
result.push_back(convertJsonToVariant((*it)));
}
return result;
}
Variant::Map convertJsonObjectToVariant(const Json::Value &object)
{
Variant::Map result;
Json::Value::Members members = object.getMemberNames();
Json::Value::Members::const_iterator it;
for (it = members.cbegin(); it != members.cend(); ++it) {
std::string key = (*it);
Variant value = convertJsonToVariant(object[(*it)]);
result.insert( std::pair<std::string, Variant>(key, value) );
}
return result;
}
Message createMessage(const std::string &input) {
Json::Value data;
Json::Reader reader;
std::stringstream bs(input);
if (!reader.parse(bs, data, false)) {
std::cerr << "parse error: " << reader.getFormatedErrorMessages();
return Message(input);
}
Message result;
result.setContentObject(convertJsonToVariant(data));
result.getContentObject().setEncoding("utf8");
return result;
}
int main(int argc, char** argv)
{
if (argc < 3) {
std::cout << "usage: " << argv[0] << " <broker> <address>" << std::endl;
return 0;
}
std::string broker = argv[1];
std::string address = argv[2];
std::cin >> std::noskipws;
std::istream_iterator<char> it(std::cin);
std::istream_iterator<char> end;
std::string data(it, end);
try {
Connection connection(broker, "{ protocol: amqp1.0 }");
connection.open();
Session session = connection.createSession();
Sender sender = session.createSender(address);
Message message = createMessage(data);
sender.send(message);
connection.close();
return 0;
} catch(const std::exception& error) {
std::cerr << error.what() << std::endl;
return 1;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment