Skip to content

Instantly share code, notes, and snippets.

@jamboree
Last active November 30, 2016 02:24
Show Gist options
  • Save jamboree/b0ce21f0db6a1f4b947197d66356dc9e to your computer and use it in GitHub Desktop.
Save jamboree/b0ce21f0db6a1f4b947197d66356dc9e to your computer and use it in GitHub Desktop.
#include <thread>
#include <iostream>
#include <atomic>
#include <boost/asio/io_service.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/utility/string_ref.hpp>
namespace asio = boost::asio;
template<class Socket, class Timer>
class timed_receiver
{
struct state
{
std::atomic_flag flag = ATOMIC_FLAG_INIT;
std::atomic<bool> receiving{false};
std::function<void(boost::system::error_code)> callback;
void complete(boost::system::error_code ec)
{
if (flag.test_and_set(std::memory_order::memory_order_relaxed))
return;
auto call(std::move(callback));
call(ec);
}
};
Socket& _sock;
Timer _timer;
std::shared_ptr<state> _state;
public:
timed_receiver(Socket& sock)
: _sock(sock)
, _timer(sock.get_io_service())
, _state(std::make_shared<state>())
{
sock.non_blocking(true);
}
template<class MutableBufferSequence, class Duration, class Handler>
void async_receive_from(MutableBufferSequence const& buffers, typename Socket::endpoint_type& endpoint, Duration d, Handler h)
{
_timer.expires_from_now(d);
_state->callback = [=, &endpoint, h = std::move(h)](boost::system::error_code ec)
{
std::size_t len = 0;
if (!ec)
len = _sock.receive_from(buffers, endpoint, 0, ec);
h(ec, len);
};
_state->flag.clear();
_timer.async_wait([s = std::weak_ptr<state>(_state)](boost::system::error_code ec)
{
if (ec == asio::error::operation_aborted)
return;
if (auto state = s.lock())
state->complete(asio::error::timed_out);
});
if (!_state->receiving.exchange(true, std::memory_order::memory_order_relaxed))
{
_sock.async_receive_from(asio::null_buffers(), endpoint, [s = std::weak_ptr<state>(_state)](boost::system::error_code ec, std::size_t)
{
if (auto state = s.lock())
{
state->receiving.store(false, std::memory_order::memory_order_relaxed);
state->complete(ec);
}
});
}
}
template<class MutableBufferSequence, class Duration>
std::size_t receive_from(MutableBufferSequence const& buffers, typename Socket::endpoint_type& endpoint, Duration d, boost::system::error_code& ec_ret)
{
std::mutex mtx;
std::condition_variable cond;
bool not_ready = true;
std::size_t len_ret;
async_receive_from(buffers, endpoint, d, [&](boost::system::error_code ec, std::size_t len)
{
{
std::unique_lock<std::mutex> lock(mtx);
ec_ret = ec;
len_ret = len;
not_ready = false;
}
cond.notify_one();
});
std::unique_lock<std::mutex> lock(mtx);
while (not_ready)
cond.wait(lock);
return len_ret;
}
void cancel()
{
if (_state->flag.test_and_set(std::memory_order::memory_order_relaxed))
return;
_sock.get_io_service().post([call = std::move(_state->callback)]
{
call(asio::error::operation_aborted);
});
}
~timed_receiver()
{
cancel();
}
};
int main(int argc, char* argv[])
{
asio::io_service io;
if (argc > 1) // client
{
asio::ip::udp::socket sock(io, asio::ip::udp::v4());
asio::ip::udp::endpoint remote(asio::ip::address_v4::loopback(), 8823);
try
{
sock.send_to(asio::buffer(argv[1], std::strlen(argv[1])), remote);
}
catch (const std::exception& e)
{
std::cout << remote << ":" << e.what();
}
}
else // server
{
asio::ip::udp::socket sock(io, asio::ip::udp::endpoint(asio::ip::udp::v4(), 8823));
timed_receiver<asio::ip::udp::socket, asio::steady_timer> recv(sock);
asio::io_service::work work(io);
std::thread thread([&io]() { io.run(); });
char recv_buf[1024];
try
{
for (;;)
{
asio::ip::udp::endpoint remote;
boost::system::error_code ec;
auto recv_length = recv.receive_from(asio::buffer(recv_buf), remote, std::chrono::seconds(5), ec);
if (ec == asio::error::timed_out)
{
std::cout << "time out. Nothing received.\n";
}
else
{
std::cout << "received something: \"" << boost::string_ref(recv_buf, recv_length) << "\"\n";
}
}
}
catch (const std::exception& e)
{
std::cout << e.what();
}
thread.join();
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment