Skip to content

Instantly share code, notes, and snippets.

@ricejasonf
Created November 26, 2016 07:06
Show Gist options
  • Save ricejasonf/c9335d86b169bcca9d73f57ea1b97a37 to your computer and use it in GitHub Desktop.
Save ricejasonf/c9335d86b169bcca9d73f57ea1b97a37 to your computer and use it in GitHub Desktop.
# TODO use in house docker images for compilers
FROM ricejasonf/clang
WORKDIR /usr/local/src
# asio
RUN git clone https://github.com/chriskohlhoff/asio.git \
&& cd asio/asio \
&& ./autogen.sh \
&& ./configure --prefix=/usr/local --without-boost \
CXXFLAGS="-stdlib=libc++" LDFLAGS="-stdlib=libc++ -lc++abi" \
&& make install
# amqp-cpp
RUN git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git \
&& cd AMQP-CPP && mkdir build && cd build \
&& cmake \
-DCMAKE_INSTALL_PREFIX=/usr/local \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_CXX_FLAGS=" -stdlib=libc++" \
-DCMAKE_EXE_LINKER_FLAGS=" -lc++abi" \
.. \
&& make install
# jsoncpp
RUN git clone https://github.com/open-source-parsers/jsoncpp.git \
&& cd jsoncpp && mkdir build && cd build \
&& cmake \
-DCMAKE_INSTALL_PREFIX=/usr/local \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_CXX_FLAGS=" -stdlib=libc++" \
-DCMAKE_EXE_LINKER_FLAGS=" -lc++abi" \
.. \
&& make install \
&& mkdir /usr/local/include/jsoncpp \
&& cp -r /usr/local/include/json /usr/local/include/jsoncpp/
# hana (workaround branch)
RUN git clone -b bugfix/constexpr_arrays https://github.com/ricejasonf/hana.git \
&& cd hana && mkdir build && cd build \
&& cmake \
-DCMAKE_INSTALL_PREFIX=/usr/local \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_CXX_FLAGS=" -stdlib=libc++" \
-DCMAKE_EXE_LINKER_FLAGS=" -lc++abi" \
.. \
&& make install
# nbdl
RUN git clone https://github.com/ricejasonf/nbdl.git \
&& cd nbdl && mkdir build && cd build \
&& cmake \
-DCMAKE_INSTALL_PREFIX=/usr/local \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_CXX_FLAGS=" -stdlib=libc++" \
-DCMAKE_EXE_LINKER_FLAGS=" -lc++abi" \
.. \
&& make install
#include <amqpcpp.h>
#include <asio.hpp>
#include <iostream>
#include <memory>
#include <nbdl.hpp>
#include <nbdl/binder/jsoncpp.hpp>
using asio::ip::tcp;
namespace amqp
{
constexpr int no_flags = 0;
namespace detail
{
class connection_handler : public AMQP::ConnectionHandler
{
std::string host;
std::string port;
/**
* Method that is called by the AMQP library every time it has data
* available that should be sent to RabbitMQ.
* @param connection pointer to the main connection object
* @param data memory buffer with the data that should be sent to RabbitMQ
* @param size size of the buffer
*/
virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
{
asio::async_write(
socket,
asio::const_buffer(data, size),
[](asio::error_code const& ec, std::size_t)
{
if (ec)
{
std::cerr << "\nERROR: Socket write failed: " << ec.message() << '\n';
}
}
);
}
/**
* Method that is called by the AMQP library when the login attempt
* succeeded. After this method has been called, the connection is ready
* to use.
* @param connection The connection that can now be used
*/
virtual void onConnected(AMQP::Connection *connection)
{
std::cout << "onConnected\n";
}
/**
* Method that is called by the AMQP library when a fatal error occurs
* on the connection, for example because data received from RabbitMQ
* could not be recognized.
* @param connection The connection on which the error occured
* @param message A human readable error message
*/
virtual void onError(AMQP::Connection *connection, const char *message)
{
std::cerr << "\nRabbitMQ ERROR: " << message << '\n';
}
/**
* Method that is called when the connection was closed. This is the
* counter part of a call to Connection::close() and it confirms that the
* connection was correctly closed.
*
* @param connection The connection that was closed and that is now unusable
*/
virtual void onClosed(AMQP::Connection *connection)
{ }
public:
tcp::socket socket;
connection_handler(asio::io_service& io, std::string const& host, std::string const& port)
: host(host)
, port(port)
, socket(tcp::socket(io))
{
connect();
}
void connect()
{
asio::connect(
socket,
tcp::resolver(socket.get_io_service()).resolve(tcp::resolver::query(host, port))
);
}
};
class connection_impl : public std::enable_shared_from_this<connection_impl>
{
tcp::socket &socket;
char buffer[1024];
public:
// just expose the impl layer
AMQP::Connection amqp_impl;
connection_impl(connection_handler* handler)
: socket(handler->socket)
, amqp_impl(handler, AMQP::Login("guest", "guest"), "/")
, buffer()
{ }
// prevent copy because we are holding a reference
connection_impl(connection_impl const&) = delete;
void keep_reading()
{
auto self(shared_from_this());
socket.async_read_some(
asio::mutable_buffer(buffer, sizeof(buffer)),
[this, self](asio::error_code const& ec, std::size_t length)
{
if (!ec)
{
// send the received data to the amqp layer
amqp_impl.parse(buffer, length);
keep_reading();
}
else
{
std::cerr << "\nERROR: Socket read failed: " << ec.message() << '\n';
}
}
);
}
};
}
class client
{
detail::connection_handler handler;
std::shared_ptr<detail::connection_impl> impl;
public:
client(asio::io_service& io, std::string const& host, std::string const& port)
: handler(io, host, port)
, impl(new detail::connection_impl(&handler))
{
impl->keep_reading();
}
client(client const&) = delete;
AMQP::Connection& amqp()
{
return impl->amqp_impl;
}
};
}
int main()
{
asio::io_service io;
amqp::client test_pusher(io, "127.0.0.1", "5672");
AMQP::Channel publish_channel(&(test_pusher.amqp()));
amqp::client client(io, "127.0.0.1", "5672");
auto& amqp = client.amqp();
AMQP::Channel consumer_channel(&amqp);
consumer_channel.declareExchange("my_exchange", AMQP::fanout);
consumer_channel.declareQueue("")
.onSuccess([&consumer_channel](std::string const& name, uint32_t, uint32_t)
{
consumer_channel.bindQueue("my_exchange", name, "my_key");
});
consumer_channel.consume("")
.onMessage([](AMQP::Message const& msg, uint64_t, bool)
{
std::cout << "RECEIVED MESSAGE: " << msg.message() << "\n";
})
.onSuccess([&publish_channel]()
{
std::cout << "consuming successfully\n";
publish_channel.publish("my_exchange", "my_key", "Hello, Message!");
})
.onError([](char const* err)
{
std::cerr << "ERROR: " << err << '\n';
});
io.run();
}
main:
clang++ -std=c++14 -stdlib=libc++ main.cpp -lpthread -ljsoncpp -lamqp-cpp -lc++abi -DASIO_STANDALONE
run: main
./a.out
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment