-
-
Save bjering/25aa623f0df970b7282d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Performance benchmark chat system - definitions and protocol | |
Definitions | |
User - Identified by a unique username, a user is either offline or online. | |
Channel - Identified by a unique name, a channel has a fixed set of users as members, what users are offline or online is not fixed. | |
Protocol | |
Client -> Server | |
login [username] - logs in a user, a user never expicitly logs off, but implicitly does so when the connection that made the logon call is disconnected. | |
say [channelname] [message] - say a message in a channel | |
ping [id] - request a pong | |
Server -> Client | |
join [channelname] [username] - a user comes online (send for every channel the user is a member of) | |
leave [channelname] [username]- a user goes offline (send for every channel the user is a member of) | |
pong [id] - sent as a response to a ping, using the same id | |
message [channelname] [username] [message] - sent on every new message in a channel the user is member of | |
error [message] - sent to inform the client of any kind of error | |
resume - sent as response to a a login if that user was already logged in on another session, that old session will have been closed. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns chat.channel | |
(:use | |
[clojure.contrib.def] | |
[chat.common])) | |
(defvar channels (agent {})) | |
(defn- list-users | |
[channel] | |
(map #(% :user-name) (@channel :users))) | |
(defn- write | |
[channel message] | |
(doseq [user (@channel :users)] | |
(user :send :write message)) | |
nil) | |
(defn- say | |
[channel user-name message] | |
(write channel (str "message " (@channel :channel-name) " " user-name " " message))) | |
(defn- user-join | |
[channel user-name] | |
(write channel (str "join " (@channel :channel-name) " " user-name))) | |
(defn- leave | |
[channel user-name] | |
(write channel (str "leave " (@channel :channel-name) " " user-name))) | |
(defn dispatcher | |
[state message & args] | |
(cond | |
(= message :agent) state | |
(= message :state) @state | |
(= message :channel-name) (@state :channel-name) | |
(= message :list-users) (apply list-users state args) | |
(= message :join) (apply user-join state args) | |
(= message :leave) (apply leave state args) | |
(= message :say) (apply say state args) | |
(= message :send) | |
(do | |
(apply chat.common/send-message state args) | |
(partial dispatcher state)) | |
:else (throw (IllegalArgumentException. | |
(str "unknown message in [" "channel" "] " message))))) | |
(defn- add-user | |
[channel user] | |
(conj channel {:users (conj (channel :users) user)})) | |
(defn create | |
[channel-name] | |
(let | |
[ | |
new-channel | |
(partial dispatcher | |
(agent | |
{ | |
:channel-name channel-name | |
:users '() | |
:add-user add-user | |
}))] | |
(send channels conj {channel-name new-channel}) | |
new-channel)) | |
(defn delete-all | |
[] | |
(defvar channels (agent {}))) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns chat.common) | |
(defn send-message | |
[agt message-name & args] | |
;(println "Sending " message-name " to " agt " with args = " args) | |
(send agt #(apply (% message-name) % args))) | |
; I realize this are two unsyncronized agents and | |
; for a short while it could be that a user | |
; is added to the channel, but not the channel | |
; to the user, and vice versa. | |
; This is ok! | |
(defn add-user-to-channel | |
[user channel] | |
(user :send :add-channel channel) | |
(channel :send :add-user user)) | |
(defn login | |
[session user] | |
(session :send :set-user user) | |
(user :send :set-session session) | |
nil) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(set! *warn-on-reflection* true) | |
(ns chat.main | |
(:require | |
[chat.common] | |
[chat.channel] | |
[chat.server] | |
[chat.user] | |
)) | |
(defmethod print-method clojure.lang.IDeref [o w] (.write w (format "#<%s@%x>" (.getSimpleName (class o)) (System/identityHashCode o)))) | |
(def jonas (chat.user/create "Jonas")) | |
(def marcus (chat.user/create "Marcus")) | |
(def lobby (chat.channel/create "Lobby")) | |
(def channel-numbers (range 5000)) | |
(def user-numbers (range 10)) | |
(defn channel-name | |
[channel-number] | |
(str "channel_" (format "%05d" channel-number))) | |
(defn user-name | |
[channel-number user-number] | |
(str | |
"stressbot_" | |
(format "%05d" channel-number) | |
"_" | |
(format "%03d" user-number))) | |
(doseq | |
[channel-number channel-numbers] | |
(let | |
[channel (chat.channel/create (channel-name channel-number))] | |
(doseq [user-number user-numbers] | |
(let | |
[user (chat.user/create (user-name channel-number user-number))] | |
(chat.common/add-user-to-channel user channel))))) | |
(chat.common/add-user-to-channel jonas lobby) | |
(chat.common/add-user-to-channel marcus lobby) | |
(def channel-00000 (@chat.channel/channels "channel_00000")) | |
(chat.common/add-user-to-channel jonas channel-00000) | |
(chat.common/add-user-to-channel marcus channel-00000) | |
;(lobby :send :write ((jonas :state) :user-name) "Hej") | |
(def server (chat.server/create 4711)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <iomanip> | |
#include <string> | |
#include <sstream> | |
#include <vector> | |
#include <boost/asio.hpp> | |
#include <boost/bind.hpp> | |
#include <boost/shared_ptr.hpp> | |
#include <boost/thread.hpp> | |
#define PORT 4711 | |
class Client | |
{ | |
public: | |
Client(boost::asio::io_service& io, const std::string& name, const std::string& channelName) | |
: timer_(io), socket_(io), strand_(io), name_(name), channelName_(channelName), counter_(0) | |
{ | |
} | |
void startConnect(const std::string& host) | |
{ | |
// std::cout << "start connect" << std::endl; | |
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(host), PORT); | |
socket_.async_connect(endpoint, boost::bind(&Client::onConnect, this, boost::asio::placeholders::error)); | |
} | |
private: | |
void onConnect(const boost::system::error_code& error) | |
{ | |
if (error == boost::asio::error::fd_set_failure) | |
{ | |
std::cout << "From onConnect" << std::endl; | |
throw boost::system::system_error(error); | |
} | |
else if (error == boost::asio::error::no_buffer_space) | |
{ | |
std::cout << "From onConnect" << std::endl; | |
throw boost::system::system_error(error); | |
} | |
startRead(); | |
startWrite("login " + name_); | |
startBehavior(); | |
} | |
void startRead() | |
{ | |
boost::asio::async_read_until(socket_, readBuf_, '\0', strand_.wrap(boost::bind(&Client::onReadComplete, this, boost::asio::placeholders::error))); | |
} | |
void Client::onReadComplete(boost::system::error_code error) | |
{ | |
if (error == boost::asio::error::connection_reset // Client disconnected hard | |
|| error == boost::asio::error::operation_aborted // Server-side closed socket while doing async_read? | |
// || error.value() == 1236) // Windows specific hack - Might be needed? Used to be in old code. | |
|| error == boost::asio::error::eof) // The other side closed the socket | |
{ | |
std::cout << "From onReadComplete" << std::endl; | |
throw boost::system::system_error(error); | |
} | |
parse(); | |
startRead(); | |
} | |
void Client::parse() | |
{ | |
std::istream in(&readBuf_); | |
std::string line; | |
std::getline(in, line); | |
} | |
void startWrite(const std::string& message) | |
{ | |
writeBuf_ = message; // copy to ensure writeBuf_.c_str() buffer is legal during transmission. | |
boost::asio::async_write(socket_, boost::asio::buffer(writeBuf_.c_str(), writeBuf_.size() + 1), strand_.wrap(boost::bind(&Client::onWriteComplete, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); | |
} | |
void onWriteComplete(const boost::system::error_code& error, size_t bytesTransferred) | |
{ | |
if (error) | |
{ | |
std::cout << "From onWriteComplete" << std::endl; | |
throw boost::system::system_error(error); | |
} | |
} | |
void startBehavior() | |
{ | |
timer_.expires_from_now(boost::posix_time::milliseconds(static_cast<int>((10000.0 * std::rand() / RAND_MAX)))); | |
timer_.async_wait(strand_.wrap(boost::bind(&Client::onBehavior, this, boost::asio::placeholders::error))); | |
} | |
void onBehavior(const boost::system::error_code& error) | |
{ | |
if (error) | |
{ | |
std::cout << "From onBehavior" << std::endl; | |
throw boost::system::system_error(error); | |
} | |
std::ostringstream out; | |
out << "say " << channelName_ << " Game is unbalanced, nerf the $$$-items NOW! - [" << counter_++ << "]"; | |
startWrite(out.str()); | |
startBehavior(); | |
} | |
boost::asio::deadline_timer timer_; | |
boost::asio::ip::tcp::socket socket_; | |
boost::asio::strand strand_; | |
std::string name_; | |
std::string channelName_; | |
int counter_; | |
std::string writeBuf_; | |
boost::asio::streambuf readBuf_; | |
}; | |
void runIoAndReportExceptions(boost::asio::io_service* io) | |
{ | |
try | |
{ | |
io->run(); | |
} | |
catch (boost::system::system_error& error) | |
{ | |
std::cout << error.what(); | |
} | |
catch (std::exception& error) | |
{ | |
std::cout << error.what(); | |
} | |
catch (...) | |
{ | |
std::cout << "Unknown exception"; | |
} | |
} | |
class Simulation | |
{ | |
public: | |
Simulation(boost::asio::io_service& io, const std::string& host, int startChannel, int endChannel, int usersPerChannel, int launchDelayInMs) | |
: io_(io), host_(host), timer_(io), startChannel_(startChannel), endChannel_(endChannel), usersPerChannel_(usersPerChannel), launchDelayInMs_(launchDelayInMs) | |
{ | |
std::cout << "Simulation created" << std::endl; | |
timer_.expires_from_now(boost::posix_time::milliseconds(1000)); | |
timer_.async_wait(boost::bind(&Simulation::createClients, this, boost::asio::placeholders::error)); | |
} | |
private: | |
void createClients(const boost::system::error_code& error) | |
{ | |
if (error) | |
throw boost::system::system_error(error); | |
std::cout << "Creating clients..." << std::endl; | |
for (int channelNo = startChannel_; channelNo < endChannel_; ++channelNo) | |
{ | |
std::ostringstream channelName; | |
channelName << "channel_" << std::setfill('0') << std::setw(5) << channelNo; | |
for (int userNo = 0; userNo < usersPerChannel_; ++userNo) | |
{ | |
std::ostringstream name; | |
name << "stressbot_" << std::setfill('0') << std::setw(5) << channelNo << "_" << std::setw(3) << userNo; | |
clients_.push_back(boost::shared_ptr<Client>(new Client(io_, name.str(), channelName.str()))); | |
} | |
} | |
std::cout << "Done Creating clients..." << std::endl; | |
timer_.expires_from_now(boost::posix_time::milliseconds(1000)); | |
timer_.async_wait(boost::bind(&Simulation::connectClients, this, boost::asio::placeholders::error)); | |
} | |
void connectClients(const boost::system::error_code& error) | |
{ | |
if (error) | |
throw boost::system::system_error(error); | |
std::cout << "Connecting clients..." << std::endl; | |
for (int i = 0; i < clients_.size(); ++i) | |
{ | |
clients_[i]->startConnect(host_); | |
boost::this_thread::sleep(boost::posix_time::milliseconds(launchDelayInMs_)); | |
} | |
std::cout << "Done Connecting clients..." << std::endl; | |
} | |
boost::asio::io_service& io_; | |
const std::string host_; | |
boost::asio::deadline_timer timer_; | |
int startChannel_; | |
int endChannel_; | |
int usersPerChannel_; | |
int launchDelayInMs_; | |
std::vector<boost::shared_ptr<Client> > clients_; | |
}; | |
int main(int argc, char* argv[]) | |
{ | |
try | |
{ | |
boost::asio::io_service io; | |
if (argc != 6) | |
{ | |
std::cout << "Usage: cppstressbot [host] [#start_channel (0 - 99999)] [#end_channel-1 (1 - 100000)] [#users per channel (1-1000)] [launch delay in ms]" << std::endl; | |
return 1; | |
} | |
char* host = argv[1]; | |
std::istringstream startChannelStr(argv[2]); | |
std::istringstream endChannelStr(argv[3]); | |
std::istringstream usersPerChannelStr(argv[4]); | |
std::istringstream launchDelayInMsStr(argv[5]); | |
int startChannel; | |
int endChannel; | |
int usersPerChannel; | |
int launchDelayInMs; | |
startChannelStr >> startChannel; | |
endChannelStr >> endChannel; | |
usersPerChannelStr >> usersPerChannel; | |
launchDelayInMsStr >> launchDelayInMs; | |
std::cout << "Running with:" << std::endl; | |
std::cout << " - host:" << host << std::endl; | |
std::cout << " - startChannel:" << startChannel <<std::endl; | |
std::cout << " - endChannel:" << endChannel << std::endl; | |
std::cout << " - usersPerChannel:" << usersPerChannel << std::endl; | |
std::cout << " - launch delay:" << launchDelayInMs << " ms" << std::endl; | |
Simulation s(io, host, startChannel, endChannel, usersPerChannel, launchDelayInMs); | |
/* | |
std::cout << "Creating clients..." << std::endl; | |
std::cout << "Done creating clients..." << std::endl; | |
*/ | |
int noOfThreads = 4; | |
boost::thread_group group; | |
for (int i = 0; i < noOfThreads; ++i) | |
group.add_thread(new boost::thread(boost::bind(runIoAndReportExceptions, &io))); | |
std::cout << "Worker threads ready..." << std::endl; | |
group.join_all(); | |
} | |
catch (boost::system::system_error& error) | |
{ | |
std::cout << error.what(); | |
} | |
catch (std::exception& error) | |
{ | |
std::cout << error.what(); | |
} | |
catch (...) | |
{ | |
std::cout << "Unknown exception"; | |
} | |
std::cout << std::endl; | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(set! *warn-on-reflection* true) | |
(ns chat.main | |
(:require | |
[chat.common] | |
[chat.channel] | |
[chat.server] | |
[chat.user] | |
)) | |
(defmethod print-method clojure.lang.IDeref [o w] (.write w (format "#<%s@%x>" (.getSimpleName (class o)) (System/identityHashCode o)))) | |
(def jonas (chat.user/create "Jonas")) | |
(def marcus (chat.user/create "Marcus")) | |
(def lobby (chat.channel/create "Lobby")) | |
(def channel-numbers (range 5000)) | |
(def user-numbers (range 10)) | |
(defn channel-name | |
[channel-number] | |
(str "channel_" (format "%05d" channel-number))) | |
(defn user-name | |
[channel-number user-number] | |
(str | |
"stressbot_" | |
(format "%05d" channel-number) | |
"_" | |
(format "%03d" user-number))) | |
(doseq | |
[channel-number channel-numbers] | |
(let | |
[channel (chat.channel/create (channel-name channel-number))] | |
(doseq [user-number user-numbers] | |
(let | |
[user (chat.user/create (user-name channel-number user-number))] | |
(chat.common/add-user-to-channel user channel))))) | |
(chat.common/add-user-to-channel jonas lobby) | |
(chat.common/add-user-to-channel marcus lobby) | |
(def channel-00000 (@chat.channel/channels "channel_00000")) | |
(chat.common/add-user-to-channel jonas channel-00000) | |
(chat.common/add-user-to-channel marcus channel-00000) | |
;(lobby :send :write ((jonas :state) :user-name) "Hej") | |
(def server (chat.server/create 4711)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns chat.session | |
(:use | |
[clojure.contrib.def]) | |
(:require | |
[chat.common] | |
[chat.session] | |
[chat.user]) | |
(:import | |
[java.net InetSocketAddress] | |
[java.util.concurrent Executors] | |
[org.jboss.netty.bootstrap ServerBootstrap] | |
[org.jboss.netty.channel Channel Channels ChannelPipelineFactory SimpleChannelHandler] | |
[org.jboss.netty.channel.group DefaultChannelGroup] | |
[org.jboss.netty.channel.socket.nio NioServerSocketChannelFactory] | |
[org.jboss.netty.buffer ChannelBuffers] | |
[org.jboss.netty.handler.codec.frame DelimiterBasedFrameDecoder Delimiters])) | |
;helpers | |
(defn encode | |
"Creates a buffer prepared for a Flash XMLSocket" | |
[#^String s] | |
(let | |
[byte-arr (.getBytes (str s "\0") "UTF-8")] | |
(ChannelBuffers/wrappedBuffer | |
byte-arr))) | |
(def flash-policy-file | |
(str "<?xml version=\"1.0\"?>" | |
"<!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\">" | |
"<cross-domain-policy>" | |
"<allow-access-from domain=\"195.198.115.136\" to-ports=\"4711\"/>" | |
"</cross-domain-policy>")) | |
(defn- login | |
[this user-name] | |
(let | |
[user (@chat.user/users user-name)] | |
(if user | |
(chat.common/login this user) | |
(str "error No such user: [" user-name "]" )))) | |
(defn- handle-message | |
[this msg] | |
(let | |
[msg-seq (.split #" " msg) | |
command (first msg-seq) | |
args (rest msg-seq)] | |
(cond | |
(= "login" command) (apply login this args) | |
:else (str "error Unknown Command: " command)))) | |
(defn- session-parse | |
[this msg] | |
(cond | |
(.isEmpty #^String msg) "error Empty Message" | |
(= '<' (get msg 0)) flash-policy-file | |
:else (handle-message this msg))) | |
(defn- receive | |
[this msg] | |
(let | |
[user (this :user) | |
reply (if user | |
(user :parse msg) | |
(session-parse this msg))] | |
(when reply | |
(do | |
(this :write reply))))) | |
(defn- write | |
[this msg] | |
(let | |
[#^Channel netty-channel ((this :state) :netty-channel)] | |
(when (.isOpen netty-channel) | |
(.write netty-channel (encode msg))))) | |
(defn- disconnect | |
[this] | |
(let | |
[user (this :user)] | |
(when user | |
(user :send :logout)))) | |
(defn dispatcher | |
[agt message & args] | |
(let | |
[this (partial dispatcher agt)] | |
(cond | |
(= message :agent) agt | |
(= message :state) @agt | |
(= message :user) (@agt :user) | |
(= message :receive) (apply receive this args) | |
(= message :write) (apply write this args) | |
(= message :disconnect) (apply disconnect this args) | |
(= message :send) | |
(do | |
(apply chat.common/send-message agt args) | |
this) | |
:else (throw (IllegalArgumentException. | |
(str "unknown message in [" "session" "] " message)))))) | |
(defn- set-user | |
[session user] | |
(conj session {:user user})) | |
(defn- close | |
[session] | |
(let | |
[#^Channel netty-channel (session :netty-channel)] | |
(when (.isOpen netty-channel) | |
(.close netty-channel)) | |
(conj session {:user nil}))) | |
(defn create | |
[netty-channel] | |
(partial dispatcher | |
(agent | |
{:netty-channel netty-channel | |
:user nil | |
:set-user set-user | |
:close close}))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns chat.session | |
(:use | |
[clojure.contrib.def]) | |
(:require | |
[chat.common] | |
[chat.session] | |
[chat.user]) | |
(:import | |
[java.net InetSocketAddress] | |
[java.util.concurrent Executors] | |
[org.jboss.netty.bootstrap ServerBootstrap] | |
[org.jboss.netty.channel Channel Channels ChannelPipelineFactory SimpleChannelHandler] | |
[org.jboss.netty.channel.group DefaultChannelGroup] | |
[org.jboss.netty.channel.socket.nio NioServerSocketChannelFactory] | |
[org.jboss.netty.buffer ChannelBuffers] | |
[org.jboss.netty.handler.codec.frame DelimiterBasedFrameDecoder Delimiters])) | |
;helpers | |
(defn encode | |
"Creates a buffer prepared for a Flash XMLSocket" | |
[#^String s] | |
(let | |
[byte-arr (.getBytes (str s "\0") "UTF-8")] | |
(ChannelBuffers/wrappedBuffer | |
byte-arr))) | |
(def flash-policy-file | |
(str "<?xml version=\"1.0\"?>" | |
"<!DOCTYPE cross-domain-policy SYSTEM \"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd\">" | |
"<cross-domain-policy>" | |
"<allow-access-from domain=\"195.198.115.136\" to-ports=\"4711\"/>" | |
"</cross-domain-policy>")) | |
(defn- login | |
[this user-name] | |
(let | |
[user (@chat.user/users user-name)] | |
(if user | |
(chat.common/login this user) | |
(str "error No such user: [" user-name "]" )))) | |
(defn- handle-message | |
[this msg] | |
(let | |
[msg-seq (.split #" " msg) | |
command (first msg-seq) | |
args (rest msg-seq)] | |
(cond | |
(= "login" command) (apply login this args) | |
:else (str "error Unknown Command: " command)))) | |
(defn- session-parse | |
[this msg] | |
(cond | |
(.isEmpty #^String msg) "error Empty Message" | |
(= '<' (get msg 0)) flash-policy-file | |
:else (handle-message this msg))) | |
(defn- receive | |
[this msg] | |
(let | |
[user (this :user) | |
reply (if user | |
(user :parse msg) | |
(session-parse this msg))] | |
(when reply | |
(do | |
(this :write reply))))) | |
(defn- write | |
[this msg] | |
(let | |
[#^Channel netty-channel ((this :state) :netty-channel)] | |
(when (.isOpen netty-channel) | |
(.write netty-channel (encode msg))))) | |
(defn- disconnect | |
[this] | |
(let | |
[user (this :user)] | |
(when user | |
(user :send :logout)))) | |
(defn dispatcher | |
[agt message & args] | |
(let | |
[this (partial dispatcher agt)] | |
(cond | |
(= message :agent) agt | |
(= message :state) @agt | |
(= message :user) (@agt :user) | |
(= message :receive) (apply receive this args) | |
(= message :write) (apply write this args) | |
(= message :disconnect) (apply disconnect this args) | |
(= message :send) | |
(do | |
(apply chat.common/send-message agt args) | |
this) | |
:else (throw (IllegalArgumentException. | |
(str "unknown message in [" "session" "] " message)))))) | |
(defn- set-user | |
[session user] | |
(conj session {:user user})) | |
(defn- close | |
[session] | |
(let | |
[#^Channel netty-channel (session :netty-channel)] | |
(when (.isOpen netty-channel) | |
(.close netty-channel)) | |
(conj session {:user nil}))) | |
(defn create | |
[netty-channel] | |
(partial dispatcher | |
(agent | |
{:netty-channel netty-channel | |
:user nil | |
:set-user set-user | |
:close close}))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment