Skip to content

Instantly share code, notes, and snippets.

@bjering
Created August 23, 2010 17:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bjering/25aa623f0df970b7282d to your computer and use it in GitHub Desktop.
Save bjering/25aa623f0df970b7282d to your computer and use it in GitHub Desktop.
Performance benchmark chat system - definitions and protocol
Definitions
User - Identified by a unique username, a user is either offline or online.
Channel - Identified by a unique name, a channel has a fixed set of users as members, what users are offline or online is not fixed.
Protocol
Client -> Server
login [username] - logs in a user, a user never expicitly logs off, but implicitly does so when the connection that made the logon call is disconnected.
say [channelname] [message] - say a message in a channel
ping [id] - request a pong
Server -> Client
join [channelname] [username] - a user comes online (send for every channel the user is a member of)
leave [channelname] [username]- a user goes offline (send for every channel the user is a member of)
pong [id] - sent as a response to a ping, using the same id
message [channelname] [username] [message] - sent on every new message in a channel the user is member of
error [message] - sent to inform the client of any kind of error
resume - sent as response to a a login if that user was already logged in on another session, that old session will have been closed.
(ns chat.channel
(:use
[clojure.contrib.def]
[chat.common]))
(defvar channels (agent {}))
(defn- list-users
[channel]
(map #(% :user-name) (@channel :users)))
(defn- write
[channel message]
(doseq [user (@channel :users)]
(user :send :write message))
nil)
(defn- say
[channel user-name message]
(write channel (str "message " (@channel :channel-name) " " user-name " " message)))
(defn- user-join
[channel user-name]
(write channel (str "join " (@channel :channel-name) " " user-name)))
(defn- leave
[channel user-name]
(write channel (str "leave " (@channel :channel-name) " " user-name)))
(defn dispatcher
[state message & args]
(cond
(= message :agent) state
(= message :state) @state
(= message :channel-name) (@state :channel-name)
(= message :list-users) (apply list-users state args)
(= message :join) (apply user-join state args)
(= message :leave) (apply leave state args)
(= message :say) (apply say state args)
(= message :send)
(do
(apply chat.common/send-message state args)
(partial dispatcher state))
:else (throw (IllegalArgumentException.
(str "unknown message in [" "channel" "] " message)))))
(defn- add-user
[channel user]
(conj channel {:users (conj (channel :users) user)}))
(defn create
[channel-name]
(let
[
new-channel
(partial dispatcher
(agent
{
:channel-name channel-name
:users '()
:add-user add-user
}))]
(send channels conj {channel-name new-channel})
new-channel))
(defn delete-all
[]
(defvar channels (agent {})))
(ns chat.common)
(defn send-message
[agt message-name & args]
;(println "Sending " message-name " to " agt " with args = " args)
(send agt #(apply (% message-name) % args)))
; I realize this are two unsyncronized agents and
; for a short while it could be that a user
; is added to the channel, but not the channel
; to the user, and vice versa.
; This is ok!
(defn add-user-to-channel
[user channel]
(user :send :add-channel channel)
(channel :send :add-user user))
(defn login
[session user]
(session :send :set-user user)
(user :send :set-session session)
nil)
(set! *warn-on-reflection* true)
(ns chat.main
(:require
[chat.common]
[chat.channel]
[chat.server]
[chat.user]
))
(defmethod print-method clojure.lang.IDeref [o w] (.write w (format "#<%s@%x>" (.getSimpleName (class o)) (System/identityHashCode o))))
(def jonas (chat.user/create "Jonas"))
(def marcus (chat.user/create "Marcus"))
(def lobby (chat.channel/create "Lobby"))
(def channel-numbers (range 5000))
(def user-numbers (range 10))
(defn channel-name
[channel-number]
(str "channel_" (format "%05d" channel-number)))
(defn user-name
[channel-number user-number]
(str
"stressbot_"
(format "%05d" channel-number)
"_"
(format "%03d" user-number)))
(doseq
[channel-number channel-numbers]
(let
[channel (chat.channel/create (channel-name channel-number))]
(doseq [user-number user-numbers]
(let
[user (chat.user/create (user-name channel-number user-number))]
(chat.common/add-user-to-channel user channel)))))
(chat.common/add-user-to-channel jonas lobby)
(chat.common/add-user-to-channel marcus lobby)
(def channel-00000 (@chat.channel/channels "channel_00000"))
(chat.common/add-user-to-channel jonas channel-00000)
(chat.common/add-user-to-channel marcus channel-00000)
;(lobby :send :write ((jonas :state) :user-name) "Hej")
(def server (chat.server/create 4711))
#include <iostream>
#include <iomanip>
#include <string>
#include <sstream>
#include <vector>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#define PORT 4711
class Client
{
public:
Client(boost::asio::io_service& io, const std::string& name, const std::string& channelName)
: timer_(io), socket_(io), strand_(io), name_(name), channelName_(channelName), counter_(0)
{
}
void startConnect(const std::string& host)
{
// std::cout << "start connect" << std::endl;
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(host), PORT);
socket_.async_connect(endpoint, boost::bind(&Client::onConnect, this, boost::asio::placeholders::error));
}
private:
void onConnect(const boost::system::error_code& error)
{
if (error == boost::asio::error::fd_set_failure)
{
std::cout << "From onConnect" << std::endl;
throw boost::system::system_error(error);
}
else if (error == boost::asio::error::no_buffer_space)
{
std::cout << "From onConnect" << std::endl;
throw boost::system::system_error(error);
}
startRead();
startWrite("login " + name_);
startBehavior();
}
void startRead()
{
boost::asio::async_read_until(socket_, readBuf_, '\0', strand_.wrap(boost::bind(&Client::onReadComplete, this, boost::asio::placeholders::error)));
}
void Client::onReadComplete(boost::system::error_code error)
{
if (error == boost::asio::error::connection_reset // Client disconnected hard
|| error == boost::asio::error::operation_aborted // Server-side closed socket while doing async_read?
// || error.value() == 1236) // Windows specific hack - Might be needed? Used to be in old code.
|| error == boost::asio::error::eof) // The other side closed the socket
{
std::cout << "From onReadComplete" << std::endl;
throw boost::system::system_error(error);
}
parse();
startRead();
}
void Client::parse()
{
std::istream in(&readBuf_);
std::string line;
std::getline(in, line);
}
void startWrite(const std::string& message)
{
writeBuf_ = message; // copy to ensure writeBuf_.c_str() buffer is legal during transmission.
boost::asio::async_write(socket_, boost::asio::buffer(writeBuf_.c_str(), writeBuf_.size() + 1), strand_.wrap(boost::bind(&Client::onWriteComplete, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
}
void onWriteComplete(const boost::system::error_code& error, size_t bytesTransferred)
{
if (error)
{
std::cout << "From onWriteComplete" << std::endl;
throw boost::system::system_error(error);
}
}
void startBehavior()
{
timer_.expires_from_now(boost::posix_time::milliseconds(static_cast<int>((10000.0 * std::rand() / RAND_MAX))));
timer_.async_wait(strand_.wrap(boost::bind(&Client::onBehavior, this, boost::asio::placeholders::error)));
}
void onBehavior(const boost::system::error_code& error)
{
if (error)
{
std::cout << "From onBehavior" << std::endl;
throw boost::system::system_error(error);
}
std::ostringstream out;
out << "say " << channelName_ << " Game is unbalanced, nerf the $$$-items NOW! - [" << counter_++ << "]";
startWrite(out.str());
startBehavior();
}
boost::asio::deadline_timer timer_;
boost::asio::ip::tcp::socket socket_;
boost::asio::strand strand_;
std::string name_;
std::string channelName_;
int counter_;
std::string writeBuf_;
boost::asio::streambuf readBuf_;
};
void runIoAndReportExceptions(boost::asio::io_service* io)
{
try
{
io->run();
}
catch (boost::system::system_error& error)
{
std::cout << error.what();
}
catch (std::exception& error)
{
std::cout << error.what();
}
catch (...)
{
std::cout << "Unknown exception";
}
}
class Simulation
{
public:
Simulation(boost::asio::io_service& io, const std::string& host, int startChannel, int endChannel, int usersPerChannel, int launchDelayInMs)
: io_(io), host_(host), timer_(io), startChannel_(startChannel), endChannel_(endChannel), usersPerChannel_(usersPerChannel), launchDelayInMs_(launchDelayInMs)
{
std::cout << "Simulation created" << std::endl;
timer_.expires_from_now(boost::posix_time::milliseconds(1000));
timer_.async_wait(boost::bind(&Simulation::createClients, this, boost::asio::placeholders::error));
}
private:
void createClients(const boost::system::error_code& error)
{
if (error)
throw boost::system::system_error(error);
std::cout << "Creating clients..." << std::endl;
for (int channelNo = startChannel_; channelNo < endChannel_; ++channelNo)
{
std::ostringstream channelName;
channelName << "channel_" << std::setfill('0') << std::setw(5) << channelNo;
for (int userNo = 0; userNo < usersPerChannel_; ++userNo)
{
std::ostringstream name;
name << "stressbot_" << std::setfill('0') << std::setw(5) << channelNo << "_" << std::setw(3) << userNo;
clients_.push_back(boost::shared_ptr<Client>(new Client(io_, name.str(), channelName.str())));
}
}
std::cout << "Done Creating clients..." << std::endl;
timer_.expires_from_now(boost::posix_time::milliseconds(1000));
timer_.async_wait(boost::bind(&Simulation::connectClients, this, boost::asio::placeholders::error));
}
void connectClients(const boost::system::error_code& error)
{
if (error)
throw boost::system::system_error(error);
std::cout << "Connecting clients..." << std::endl;
for (int i = 0; i < clients_.size(); ++i)
{
clients_[i]->startConnect(host_);
boost::this_thread::sleep(boost::posix_time::milliseconds(launchDelayInMs_));
}
std::cout << "Done Connecting clients..." << std::endl;
}
boost::asio::io_service& io_;
const std::string host_;
boost::asio::deadline_timer timer_;
int startChannel_;
int endChannel_;
int usersPerChannel_;
int launchDelayInMs_;
std::vector<boost::shared_ptr<Client> > clients_;
};
int main(int argc, char* argv[])
{
try
{
boost::asio::io_service io;
if (argc != 6)
{
std::cout << "Usage: cppstressbot [host] [#start_channel (0 - 99999)] [#end_channel-1 (1 - 100000)] [#users per channel (1-1000)] [launch delay in ms]" << std::endl;
return 1;
}
char* host = argv[1];
std::istringstream startChannelStr(argv[2]);
std::istringstream endChannelStr(argv[3]);
std::istringstream usersPerChannelStr(argv[4]);
std::istringstream launchDelayInMsStr(argv[5]);
int startChannel;
int endChannel;
int usersPerChannel;
int launchDelayInMs;
startChannelStr >> startChannel;
endChannelStr >> endChannel;
usersPerChannelStr >> usersPerChannel;
launchDelayInMsStr >> launchDelayInMs;
std::cout << "Running with:" << std::endl;
std::cout << " - host:" << host << std::endl;
std::cout << " - startChannel:" << startChannel <<std::endl;
std::cout << " - endChannel:" << endChannel << std::endl;
std::cout << " - usersPerChannel:" << usersPerChannel << std::endl;
std::cout << " - launch delay:" << launchDelayInMs << " ms" << std::endl;
Simulation s(io, host, startChannel, endChannel, usersPerChannel, launchDelayInMs);
/*
std::cout << "Creating clients..." << std::endl;
std::cout << "Done creating clients..." << std::endl;
*/
int noOfThreads = 4;
boost::thread_group group;
for (int i = 0; i < noOfThreads; ++i)
group.add_thread(new boost::thread(boost::bind(runIoAndReportExceptions, &io)));
std::cout << "Worker threads ready..." << std::endl;
group.join_all();
}
catch (boost::system::system_error& error)
{
std::cout << error.what();
}
catch (std::exception& error)
{
std::cout << error.what();
}
catch (...)
{
std::cout << "Unknown exception";
}
std::cout << std::endl;
return 0;
}
(set! *warn-on-reflection* true)
(ns chat.main
(:require
[chat.common]
[chat.channel]
[chat.server]
[chat.user]
))
(defmethod print-method clojure.lang.IDeref [o w] (.write w (format "#<%s@%x>" (.getSimpleName (class o)) (System/identityHashCode o))))
(def jonas (chat.user/create "Jonas"))
(def marcus (chat.user/create "Marcus"))
(def lobby (chat.channel/create "Lobby"))
(def channel-numbers (range 5000))
(def user-numbers (range 10))
(defn channel-name
[channel-number]
(str "channel_" (format "%05d" channel-number)))
(defn user-name
[channel-number user-number]
(str
"stressbot_"
(format "%05d" channel-number)
"_"
(format "%03d" user-number)))
(doseq
[channel-number channel-numbers]
(let
[channel (chat.channel/create (channel-name channel-number))]
(doseq [user-number user-numbers]
(let
[user (chat.user/create (user-name channel-number user-number))]
(chat.common/add-user-to-channel user channel)))))
(chat.common/add-user-to-channel jonas lobby)
(chat.common/add-user-to-channel marcus lobby)
(def channel-00000 (@chat.channel/channels "channel_00000"))
(chat.common/add-user-to-channel jonas channel-00000)
(chat.common/add-user-to-channel marcus channel-00000)
;(lobby :send :write ((jonas :state) :user-name) "Hej")
(def server (chat.server/create 4711))
(ns chat.session
(:use
[clojure.contrib.def])
(:require
[chat.common]
[chat.session]
[chat.user])
(:import
[java.net InetSocketAddress]
[java.util.concurrent Executors]
[org.jboss.netty.bootstrap ServerBootstrap]
[org.jboss.netty.channel Channel Channels ChannelPipelineFactory SimpleChannelHandler]
[org.jboss.netty.channel.group DefaultChannelGroup]
[org.jboss.netty.channel.socket.nio NioServerSocketChannelFactory]
[org.jboss.netty.buffer ChannelBuffers]
[org.jboss.netty.handler.codec.frame DelimiterBasedFrameDecoder Delimiters]))
;helpers
(defn encode
"Creates a buffer prepared for a Flash XMLSocket"
[#^String s]
(let
[byte-arr (.getBytes (str s "\0") "UTF-8")]
(ChannelBuffers/wrappedBuffer
byte-arr)))
(def flash-policy-file
(str "<?xml version=\"1.0\"?>"
"<!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\">"
"<cross-domain-policy>"
"<allow-access-from domain=\"195.198.115.136\" to-ports=\"4711\"/>"
"</cross-domain-policy>"))
(defn- login
[this user-name]
(let
[user (@chat.user/users user-name)]
(if user
(chat.common/login this user)
(str "error No such user: [" user-name "]" ))))
(defn- handle-message
[this msg]
(let
[msg-seq (.split #" " msg)
command (first msg-seq)
args (rest msg-seq)]
(cond
(= "login" command) (apply login this args)
:else (str "error Unknown Command: " command))))
(defn- session-parse
[this msg]
(cond
(.isEmpty #^String msg) "error Empty Message"
(= '<' (get msg 0)) flash-policy-file
:else (handle-message this msg)))
(defn- receive
[this msg]
(let
[user (this :user)
reply (if user
(user :parse msg)
(session-parse this msg))]
(when reply
(do
(this :write reply)))))
(defn- write
[this msg]
(let
[#^Channel netty-channel ((this :state) :netty-channel)]
(when (.isOpen netty-channel)
(.write netty-channel (encode msg)))))
(defn- disconnect
[this]
(let
[user (this :user)]
(when user
(user :send :logout))))
(defn dispatcher
[agt message & args]
(let
[this (partial dispatcher agt)]
(cond
(= message :agent) agt
(= message :state) @agt
(= message :user) (@agt :user)
(= message :receive) (apply receive this args)
(= message :write) (apply write this args)
(= message :disconnect) (apply disconnect this args)
(= message :send)
(do
(apply chat.common/send-message agt args)
this)
:else (throw (IllegalArgumentException.
(str "unknown message in [" "session" "] " message))))))
(defn- set-user
[session user]
(conj session {:user user}))
(defn- close
[session]
(let
[#^Channel netty-channel (session :netty-channel)]
(when (.isOpen netty-channel)
(.close netty-channel))
(conj session {:user nil})))
(defn create
[netty-channel]
(partial dispatcher
(agent
{:netty-channel netty-channel
:user nil
:set-user set-user
:close close})))
(ns chat.session
(:use
[clojure.contrib.def])
(:require
[chat.common]
[chat.session]
[chat.user])
(:import
[java.net InetSocketAddress]
[java.util.concurrent Executors]
[org.jboss.netty.bootstrap ServerBootstrap]
[org.jboss.netty.channel Channel Channels ChannelPipelineFactory SimpleChannelHandler]
[org.jboss.netty.channel.group DefaultChannelGroup]
[org.jboss.netty.channel.socket.nio NioServerSocketChannelFactory]
[org.jboss.netty.buffer ChannelBuffers]
[org.jboss.netty.handler.codec.frame DelimiterBasedFrameDecoder Delimiters]))
;helpers
(defn encode
"Creates a buffer prepared for a Flash XMLSocket"
[#^String s]
(let
[byte-arr (.getBytes (str s "\0") "UTF-8")]
(ChannelBuffers/wrappedBuffer
byte-arr)))
(def flash-policy-file
(str "<?xml version=\"1.0\"?>"
"<!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\">"
"<cross-domain-policy>"
"<allow-access-from domain=\"195.198.115.136\" to-ports=\"4711\"/>"
"</cross-domain-policy>"))
(defn- login
[this user-name]
(let
[user (@chat.user/users user-name)]
(if user
(chat.common/login this user)
(str "error No such user: [" user-name "]" ))))
(defn- handle-message
[this msg]
(let
[msg-seq (.split #" " msg)
command (first msg-seq)
args (rest msg-seq)]
(cond
(= "login" command) (apply login this args)
:else (str "error Unknown Command: " command))))
(defn- session-parse
[this msg]
(cond
(.isEmpty #^String msg) "error Empty Message"
(= '<' (get msg 0)) flash-policy-file
:else (handle-message this msg)))
(defn- receive
[this msg]
(let
[user (this :user)
reply (if user
(user :parse msg)
(session-parse this msg))]
(when reply
(do
(this :write reply)))))
(defn- write
[this msg]
(let
[#^Channel netty-channel ((this :state) :netty-channel)]
(when (.isOpen netty-channel)
(.write netty-channel (encode msg)))))
(defn- disconnect
[this]
(let
[user (this :user)]
(when user
(user :send :logout))))
(defn dispatcher
[agt message & args]
(let
[this (partial dispatcher agt)]
(cond
(= message :agent) agt
(= message :state) @agt
(= message :user) (@agt :user)
(= message :receive) (apply receive this args)
(= message :write) (apply write this args)
(= message :disconnect) (apply disconnect this args)
(= message :send)
(do
(apply chat.common/send-message agt args)
this)
:else (throw (IllegalArgumentException.
(str "unknown message in [" "session" "] " message))))))
(defn- set-user
[session user]
(conj session {:user user}))
(defn- close
[session]
(let
[#^Channel netty-channel (session :netty-channel)]
(when (.isOpen netty-channel)
(.close netty-channel))
(conj session {:user nil})))
(defn create
[netty-channel]
(partial dispatcher
(agent
{:netty-channel netty-channel
:user nil
:set-user set-user
:close close})))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment