Last active
November 30, 2016 02:24
-
-
Save jamboree/b0ce21f0db6a1f4b947197d66356dc9e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <thread> | |
#include <iostream> | |
#include <atomic> | |
#include <boost/asio/io_service.hpp> | |
#include <boost/asio/steady_timer.hpp> | |
#include <boost/asio/ip/udp.hpp> | |
#include <boost/asio/use_future.hpp> | |
#include <boost/utility/string_ref.hpp> | |
namespace asio = boost::asio; | |
template<class Socket, class Timer> | |
class timed_receiver | |
{ | |
struct state | |
{ | |
std::atomic_flag flag = ATOMIC_FLAG_INIT; | |
std::atomic<bool> receiving{false}; | |
std::function<void(boost::system::error_code)> callback; | |
void complete(boost::system::error_code ec) | |
{ | |
if (flag.test_and_set(std::memory_order::memory_order_relaxed)) | |
return; | |
auto call(std::move(callback)); | |
call(ec); | |
} | |
}; | |
Socket& _sock; | |
Timer _timer; | |
std::shared_ptr<state> _state; | |
public: | |
timed_receiver(Socket& sock) | |
: _sock(sock) | |
, _timer(sock.get_io_service()) | |
, _state(std::make_shared<state>()) | |
{ | |
sock.non_blocking(true); | |
} | |
template<class MutableBufferSequence, class Duration, class Handler> | |
void async_receive_from(MutableBufferSequence const& buffers, typename Socket::endpoint_type& endpoint, Duration d, Handler h) | |
{ | |
_timer.expires_from_now(d); | |
_state->callback = [=, &endpoint, h = std::move(h)](boost::system::error_code ec) | |
{ | |
std::size_t len = 0; | |
if (!ec) | |
len = _sock.receive_from(buffers, endpoint, 0, ec); | |
h(ec, len); | |
}; | |
_state->flag.clear(); | |
_timer.async_wait([s = std::weak_ptr<state>(_state)](boost::system::error_code ec) | |
{ | |
if (ec == asio::error::operation_aborted) | |
return; | |
if (auto state = s.lock()) | |
state->complete(asio::error::timed_out); | |
}); | |
if (!_state->receiving.exchange(true, std::memory_order::memory_order_relaxed)) | |
{ | |
_sock.async_receive_from(asio::null_buffers(), endpoint, [s = std::weak_ptr<state>(_state)](boost::system::error_code ec, std::size_t) | |
{ | |
if (auto state = s.lock()) | |
{ | |
state->receiving.store(false, std::memory_order::memory_order_relaxed); | |
state->complete(ec); | |
} | |
}); | |
} | |
} | |
template<class MutableBufferSequence, class Duration> | |
std::size_t receive_from(MutableBufferSequence const& buffers, typename Socket::endpoint_type& endpoint, Duration d, boost::system::error_code& ec_ret) | |
{ | |
std::mutex mtx; | |
std::condition_variable cond; | |
bool not_ready = true; | |
std::size_t len_ret; | |
async_receive_from(buffers, endpoint, d, [&](boost::system::error_code ec, std::size_t len) | |
{ | |
{ | |
std::unique_lock<std::mutex> lock(mtx); | |
ec_ret = ec; | |
len_ret = len; | |
not_ready = false; | |
} | |
cond.notify_one(); | |
}); | |
std::unique_lock<std::mutex> lock(mtx); | |
while (not_ready) | |
cond.wait(lock); | |
return len_ret; | |
} | |
void cancel() | |
{ | |
if (_state->flag.test_and_set(std::memory_order::memory_order_relaxed)) | |
return; | |
_sock.get_io_service().post([call = std::move(_state->callback)] | |
{ | |
call(asio::error::operation_aborted); | |
}); | |
} | |
~timed_receiver() | |
{ | |
cancel(); | |
} | |
}; | |
int main(int argc, char* argv[]) | |
{ | |
asio::io_service io; | |
if (argc > 1) // client | |
{ | |
asio::ip::udp::socket sock(io, asio::ip::udp::v4()); | |
asio::ip::udp::endpoint remote(asio::ip::address_v4::loopback(), 8823); | |
try | |
{ | |
sock.send_to(asio::buffer(argv[1], std::strlen(argv[1])), remote); | |
} | |
catch (const std::exception& e) | |
{ | |
std::cout << remote << ":" << e.what(); | |
} | |
} | |
else // server | |
{ | |
asio::ip::udp::socket sock(io, asio::ip::udp::endpoint(asio::ip::udp::v4(), 8823)); | |
timed_receiver<asio::ip::udp::socket, asio::steady_timer> recv(sock); | |
asio::io_service::work work(io); | |
std::thread thread([&io]() { io.run(); }); | |
char recv_buf[1024]; | |
try | |
{ | |
for (;;) | |
{ | |
asio::ip::udp::endpoint remote; | |
boost::system::error_code ec; | |
auto recv_length = recv.receive_from(asio::buffer(recv_buf), remote, std::chrono::seconds(5), ec); | |
if (ec == asio::error::timed_out) | |
{ | |
std::cout << "time out. Nothing received.\n"; | |
} | |
else | |
{ | |
std::cout << "received something: \"" << boost::string_ref(recv_buf, recv_length) << "\"\n"; | |
} | |
} | |
} | |
catch (const std::exception& e) | |
{ | |
std::cout << e.what(); | |
} | |
thread.join(); | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment