Skip to content

Instantly share code, notes, and snippets.

@edouarda
Last active February 15, 2018 10:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save edouarda/6c3b18d67d2dc3e035a2376ba63aa481 to your computer and use it in GitHub Desktop.
Save edouarda/6c3b18d67d2dc3e035a2376ba63aa481 to your computer and use it in GitHub Desktop.
//
// chat_server.cpp
// ~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "../bench.hpp"
#include "../chat_message.hpp"
#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <list>
#include <memory>
#include <set>
#include <utility>
#include <boost/lockfree/queue.hpp>
using boost::asio::ip::tcp;
//----------------------------------------------------------------------
typedef std::deque<chat_message> chat_message_queue;
//----------------------------------------------------------------------
//----------------------------------------------------------------------
template <typename SessionPtr>
class chat_room
{
public:
void join(SessionPtr participant)
{
std::unique_lock<std::mutex> lock{_fat_mutex};
participants_.insert(participant);
for (auto msg : recent_msgs_)
participant->deliver(msg);
}
void leave(SessionPtr participant)
{
std::unique_lock<std::mutex> lock{_fat_mutex};
participants_.erase(participant);
}
void deliver(const chat_message & msg)
{
std::unique_lock<std::mutex> lock{_fat_mutex};
recent_msgs_.push_back(msg);
while (recent_msgs_.size() > max_recent_msgs)
recent_msgs_.pop_front();
for (auto participant : participants_)
participant->deliver(msg);
}
private:
std::mutex _fat_mutex;
std::set<SessionPtr> participants_;
enum
{
max_recent_msgs = 100
};
chat_message_queue recent_msgs_;
};
//----------------------------------------------------------------------
class chat_session;
using chat_session_ptr = chat_session *;
class chat_session
{
private:
using recycle_function_type = std::function<void(chat_session_ptr)>;
public:
chat_session(tcp::socket socket,
chat_room<chat_session_ptr> & room,
recycle_function_type recycle)
: socket_(std::move(socket))
, room_(room)
, recycle_{recycle}
{}
void start()
{
room_.join(this);
do_read_header();
}
void deliver(const chat_message & msg)
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if (!write_in_progress)
{
do_write();
}
}
private:
void do_read_header()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.data(), chat_message::header_length),
[this](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec && read_msg_.decode_header())
{
do_read_body();
}
else
{
room_.leave(this);
recycle_(this);
}
});
}
void do_read_body()
{
boost::asio::async_read(socket_,
boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
[this](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec)
{
room_.deliver(read_msg_);
do_read_header();
}
else
{
room_.leave(this);
recycle_(this);
}
});
}
void do_write()
{
boost::asio::async_write(socket_,
boost::asio::buffer(
write_msgs_.front().data(), write_msgs_.front().length()),
[this](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec)
{
write_msgs_.pop_front();
if (!write_msgs_.empty())
{
do_write();
}
}
else
{
room_.leave(this);
recycle_(this);
}
});
}
public:
tcp::socket & socket() noexcept
{
return socket_;
}
private:
tcp::socket socket_;
chat_room<chat_session_ptr> & room_;
recycle_function_type recycle_;
chat_message read_msg_;
chat_message_queue write_msgs_;
};
//----------------------------------------------------------------------
class partition
{
private:
static constexpr size_t partition_size = 100;
void init_sessions()
{
_all_sessions.resize(partition_size);
std::generate(_all_sessions.begin(), _all_sessions.end(), [this] {
chat_session_ptr s =
new chat_session{boost::asio::ip::tcp::socket{_context}, _room,
[this](chat_session_ptr p) { this->recycle(p); }};
_free_sessions.bounded_push(s);
return s;
});
}
void clear()
{
_free_sessions.consume_all([](auto s) {});
for (chat_session_ptr s : _all_sessions)
{
boost::system::error_code ec;
s->socket().shutdown(
boost::asio::ip::tcp::socket::shutdown_both, ec);
s->socket().close(ec);
delete s;
}
_all_sessions.clear();
}
public:
partition(chat_room<chat_session_ptr> & room)
: _room{room}
{
init_sessions();
_worker = std::make_unique<boost::asio::io_context::work>(_context);
_runner_thread = std::thread{std::bind(&partition::runner, this)};
}
~partition()
{
if (_worker)
{
_worker.reset();
_context.stop();
}
clear();
_runner_thread.join();
}
public:
chat_session_ptr next()
{
chat_session_ptr s;
while (!_free_sessions.pop(s))
{
std::this_thread::sleep_for(std::chrono::milliseconds{1});
}
return s;
}
void recycle(chat_session_ptr s) noexcept
{
s->~chat_session();
s = new (s) chat_session{boost::asio::ip::tcp::socket{_context}, _room,
[this](chat_session_ptr p) { this->recycle(p); }};
_free_sessions.bounded_push(s);
}
private:
void runner()
{
boost::system::error_code ec;
while (!ec)
{
_context.run(ec);
}
}
private:
chat_room<chat_session_ptr> & _room;
std::thread _runner_thread;
std::unique_ptr<boost::asio::io_context::work> _worker;
boost::asio::io_context _context{1};
using free_queue_type = boost::lockfree::queue<chat_session_ptr,
boost::lockfree::capacity<partition_size>>;
std::vector<chat_session_ptr> _all_sessions;
free_queue_type _free_sessions;
};
class partitions_list
{
public:
partitions_list(size_t c, chat_room<chat_session_ptr> & room)
{
for (size_t i = 0; i < c; ++i)
{
_partitions.push_back(std::make_unique<partition>(room));
}
}
public:
chat_session_ptr next()
{
if (_index >= _partitions.size())
{
_index = 0;
}
return _partitions[_index++]->next();
}
private:
size_t _index{0};
std::vector<std::unique_ptr<partition>> _partitions;
};
class chat_server
{
public:
chat_server(boost::asio::io_context & io_context,
const tcp::endpoint & endpoint,
size_t count)
: acceptor_(io_context, endpoint)
, _partitions{count, room_}
{
do_accept();
}
private:
void do_accept()
{
auto session = _partitions.next();
if (!session) return;
acceptor_.async_accept(
session->socket(), [this, session](boost::system::error_code ec) {
measure_block b{"accept", _stats};
if (ec
== make_error_code(boost::system::errc::operation_canceled))
{
return;
}
if (!ec)
{
session->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
chat_room<chat_session_ptr> room_;
partitions_list _partitions;
measure_stats _stats;
};
//----------------------------------------------------------------------
int main(int argc, char * argv[])
{
try
{
if (argc != 3)
{
std::cerr << "Usage: chat_server <port> <threads>\n";
return 1;
}
boost::asio::io_context io_context{1};
tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[1]));
chat_server server{
io_context, endpoint, static_cast<size_t>(std::atol(argv[2]))};
io_context.run();
}
catch (std::exception & e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment