Last active
February 15, 2018 10:38
-
-
Save edouarda/6c3b18d67d2dc3e035a2376ba63aa481 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// chat_server.cpp | |
// ~~~~~~~~~~~~~~~ | |
// | |
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
#include "../bench.hpp" | |
#include "../chat_message.hpp" | |
#include <boost/asio.hpp> | |
#include <cstdlib> | |
#include <deque> | |
#include <iostream> | |
#include <list> | |
#include <memory> | |
#include <set> | |
#include <utility> | |
#include <boost/lockfree/queue.hpp> | |
using boost::asio::ip::tcp; | |
//---------------------------------------------------------------------- | |
typedef std::deque<chat_message> chat_message_queue; | |
//---------------------------------------------------------------------- | |
//---------------------------------------------------------------------- | |
template <typename SessionPtr> | |
class chat_room | |
{ | |
public: | |
void join(SessionPtr participant) | |
{ | |
std::unique_lock<std::mutex> lock{_fat_mutex}; | |
participants_.insert(participant); | |
for (auto msg : recent_msgs_) | |
participant->deliver(msg); | |
} | |
void leave(SessionPtr participant) | |
{ | |
std::unique_lock<std::mutex> lock{_fat_mutex}; | |
participants_.erase(participant); | |
} | |
void deliver(const chat_message & msg) | |
{ | |
std::unique_lock<std::mutex> lock{_fat_mutex}; | |
recent_msgs_.push_back(msg); | |
while (recent_msgs_.size() > max_recent_msgs) | |
recent_msgs_.pop_front(); | |
for (auto participant : participants_) | |
participant->deliver(msg); | |
} | |
private: | |
std::mutex _fat_mutex; | |
std::set<SessionPtr> participants_; | |
enum | |
{ | |
max_recent_msgs = 100 | |
}; | |
chat_message_queue recent_msgs_; | |
}; | |
//---------------------------------------------------------------------- | |
class chat_session; | |
using chat_session_ptr = chat_session *; | |
class chat_session | |
{ | |
private: | |
using recycle_function_type = std::function<void(chat_session_ptr)>; | |
public: | |
chat_session(tcp::socket socket, | |
chat_room<chat_session_ptr> & room, | |
recycle_function_type recycle) | |
: socket_(std::move(socket)) | |
, room_(room) | |
, recycle_{recycle} | |
{} | |
void start() | |
{ | |
room_.join(this); | |
do_read_header(); | |
} | |
void deliver(const chat_message & msg) | |
{ | |
bool write_in_progress = !write_msgs_.empty(); | |
write_msgs_.push_back(msg); | |
if (!write_in_progress) | |
{ | |
do_write(); | |
} | |
} | |
private: | |
void do_read_header() | |
{ | |
boost::asio::async_read(socket_, | |
boost::asio::buffer(read_msg_.data(), chat_message::header_length), | |
[this](boost::system::error_code ec, std::size_t /*length*/) { | |
if (!ec && read_msg_.decode_header()) | |
{ | |
do_read_body(); | |
} | |
else | |
{ | |
room_.leave(this); | |
recycle_(this); | |
} | |
}); | |
} | |
void do_read_body() | |
{ | |
boost::asio::async_read(socket_, | |
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()), | |
[this](boost::system::error_code ec, std::size_t /*length*/) { | |
if (!ec) | |
{ | |
room_.deliver(read_msg_); | |
do_read_header(); | |
} | |
else | |
{ | |
room_.leave(this); | |
recycle_(this); | |
} | |
}); | |
} | |
void do_write() | |
{ | |
boost::asio::async_write(socket_, | |
boost::asio::buffer( | |
write_msgs_.front().data(), write_msgs_.front().length()), | |
[this](boost::system::error_code ec, std::size_t /*length*/) { | |
if (!ec) | |
{ | |
write_msgs_.pop_front(); | |
if (!write_msgs_.empty()) | |
{ | |
do_write(); | |
} | |
} | |
else | |
{ | |
room_.leave(this); | |
recycle_(this); | |
} | |
}); | |
} | |
public: | |
tcp::socket & socket() noexcept | |
{ | |
return socket_; | |
} | |
private: | |
tcp::socket socket_; | |
chat_room<chat_session_ptr> & room_; | |
recycle_function_type recycle_; | |
chat_message read_msg_; | |
chat_message_queue write_msgs_; | |
}; | |
//---------------------------------------------------------------------- | |
class partition | |
{ | |
private: | |
static constexpr size_t partition_size = 100; | |
void init_sessions() | |
{ | |
_all_sessions.resize(partition_size); | |
std::generate(_all_sessions.begin(), _all_sessions.end(), [this] { | |
chat_session_ptr s = | |
new chat_session{boost::asio::ip::tcp::socket{_context}, _room, | |
[this](chat_session_ptr p) { this->recycle(p); }}; | |
_free_sessions.bounded_push(s); | |
return s; | |
}); | |
} | |
void clear() | |
{ | |
_free_sessions.consume_all([](auto s) {}); | |
for (chat_session_ptr s : _all_sessions) | |
{ | |
boost::system::error_code ec; | |
s->socket().shutdown( | |
boost::asio::ip::tcp::socket::shutdown_both, ec); | |
s->socket().close(ec); | |
delete s; | |
} | |
_all_sessions.clear(); | |
} | |
public: | |
partition(chat_room<chat_session_ptr> & room) | |
: _room{room} | |
{ | |
init_sessions(); | |
_worker = std::make_unique<boost::asio::io_context::work>(_context); | |
_runner_thread = std::thread{std::bind(&partition::runner, this)}; | |
} | |
~partition() | |
{ | |
if (_worker) | |
{ | |
_worker.reset(); | |
_context.stop(); | |
} | |
clear(); | |
_runner_thread.join(); | |
} | |
public: | |
chat_session_ptr next() | |
{ | |
chat_session_ptr s; | |
while (!_free_sessions.pop(s)) | |
{ | |
std::this_thread::sleep_for(std::chrono::milliseconds{1}); | |
} | |
return s; | |
} | |
void recycle(chat_session_ptr s) noexcept | |
{ | |
s->~chat_session(); | |
s = new (s) chat_session{boost::asio::ip::tcp::socket{_context}, _room, | |
[this](chat_session_ptr p) { this->recycle(p); }}; | |
_free_sessions.bounded_push(s); | |
} | |
private: | |
void runner() | |
{ | |
boost::system::error_code ec; | |
while (!ec) | |
{ | |
_context.run(ec); | |
} | |
} | |
private: | |
chat_room<chat_session_ptr> & _room; | |
std::thread _runner_thread; | |
std::unique_ptr<boost::asio::io_context::work> _worker; | |
boost::asio::io_context _context{1}; | |
using free_queue_type = boost::lockfree::queue<chat_session_ptr, | |
boost::lockfree::capacity<partition_size>>; | |
std::vector<chat_session_ptr> _all_sessions; | |
free_queue_type _free_sessions; | |
}; | |
class partitions_list | |
{ | |
public: | |
partitions_list(size_t c, chat_room<chat_session_ptr> & room) | |
{ | |
for (size_t i = 0; i < c; ++i) | |
{ | |
_partitions.push_back(std::make_unique<partition>(room)); | |
} | |
} | |
public: | |
chat_session_ptr next() | |
{ | |
if (_index >= _partitions.size()) | |
{ | |
_index = 0; | |
} | |
return _partitions[_index++]->next(); | |
} | |
private: | |
size_t _index{0}; | |
std::vector<std::unique_ptr<partition>> _partitions; | |
}; | |
class chat_server | |
{ | |
public: | |
chat_server(boost::asio::io_context & io_context, | |
const tcp::endpoint & endpoint, | |
size_t count) | |
: acceptor_(io_context, endpoint) | |
, _partitions{count, room_} | |
{ | |
do_accept(); | |
} | |
private: | |
void do_accept() | |
{ | |
auto session = _partitions.next(); | |
if (!session) return; | |
acceptor_.async_accept( | |
session->socket(), [this, session](boost::system::error_code ec) { | |
measure_block b{"accept", _stats}; | |
if (ec | |
== make_error_code(boost::system::errc::operation_canceled)) | |
{ | |
return; | |
} | |
if (!ec) | |
{ | |
session->start(); | |
} | |
do_accept(); | |
}); | |
} | |
tcp::acceptor acceptor_; | |
chat_room<chat_session_ptr> room_; | |
partitions_list _partitions; | |
measure_stats _stats; | |
}; | |
//---------------------------------------------------------------------- | |
int main(int argc, char * argv[]) | |
{ | |
try | |
{ | |
if (argc != 3) | |
{ | |
std::cerr << "Usage: chat_server <port> <threads>\n"; | |
return 1; | |
} | |
boost::asio::io_context io_context{1}; | |
tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[1])); | |
chat_server server{ | |
io_context, endpoint, static_cast<size_t>(std::atol(argv[2]))}; | |
io_context.run(); | |
} | |
catch (std::exception & e) | |
{ | |
std::cerr << "Exception: " << e.what() << "\n"; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment