Last active
December 18, 2018 08:55
-
-
Save zzxx-husky/e1edafa4522dc80558a0288bb208cea7 to your computer and use it in GitHub Desktop.
Minor Update
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "statistic_monitor.hpp" | |
#include "cassert" | |
#include "caf/all.hpp" | |
#include "caf/io/all.hpp" | |
using namespace caf; | |
using namespace chrono; | |
using namespace literals; | |
string self_host; | |
vector <string> worker_hosts; | |
vector <actor> global_kvstores; | |
// each machine has one worker and all workers use the same port | |
unsigned port = 22346; | |
unsigned num_requesters = 20; | |
unsigned num_kvstores = 10; | |
using start = atom_constant<atom("Start")>; | |
using req = atom_constant<atom("Req")>; | |
using rep = atom_constant<atom("Rep")>; | |
using kvstore = atom_constant<atom("KVStore")>; | |
struct KVStore : public event_based_actor { | |
unsigned num = 0; | |
StatisticMonitor<unsigned> mon_num; // monitor request receiving tput | |
KVStore(actor_config &cfg) : event_based_actor(cfg) { | |
mon_num.set_name("num_kv").watch([=, old = 0u]() mutable { | |
unsigned snapshot = this->num; | |
unsigned diff = snapshot - old; | |
old = snapshot; | |
return diff; | |
}).launch(); | |
} | |
behavior make_behavior() override { | |
return { | |
[=](req, string key) { | |
++this->num; | |
auto r = actor_cast<actor>(this->current_sender()); | |
this->send<message_priority::high>(r, rep::value, key); | |
} | |
}; | |
} | |
}; | |
struct Requester : public event_based_actor { | |
hash <string> hasher{}; | |
unsigned num_rep = 0, num_req = 0; | |
StatisticMonitor<unsigned> mon_rep, mon_req; | |
Requester(actor_config &cfg) : event_based_actor(cfg) {} | |
behavior make_behavior() override { | |
return { | |
[=](start) { | |
// launch the dispatcher | |
auto d = this->spawn(Requester::dispatcher); | |
this->send(d, start::value); | |
mon_req.set_name("num_req").watch([=, old = 0u]() mutable { | |
unsigned snapshot = this->num_req; | |
unsigned diff = snapshot - old; | |
old = snapshot; | |
return diff; | |
}).launch(); | |
mon_rep.set_name("num_rep").watch([=, old = 0u]() mutable { | |
unsigned snapshot = this->num_rep; | |
unsigned diff = snapshot - old; | |
old = snapshot; | |
return diff; | |
}).launch(); | |
}, | |
[=](req, const string &key) { | |
// receive one elem from dispatcher and make request to kvstore instance | |
++num_req; | |
this->send(global_kvstores[hasher(key) % global_kvstores.size()], req::value, key); | |
}, | |
[=](rep, const string &reply) { | |
// get a reply from kvstore instances | |
++num_rep; | |
} | |
}; | |
} | |
static void dispatcher(blocking_actor *self) { | |
self->receive([=](start) { | |
auto r = actor_cast<actor>(self->current_sender()); | |
// 30000ms = 30s | |
string msg = "abcdefghijklmnopqrstuvwxyz0123456789"; | |
for (unsigned i = 0; i < 30000; i++) { | |
// 10 per ms, 10000 per second | |
for (unsigned j = 0; j < 2; j++) { | |
self->send(r, req::value, msg); | |
this_thread::sleep_for(1ms); | |
} | |
} | |
}); | |
} | |
}; | |
struct Worker : public event_based_actor { | |
vector <actor> requesters; | |
vector <actor> kvstores; | |
vector <actor> workers; | |
unsigned num_recv = 0; | |
Worker(actor_config &cfg) : event_based_actor(cfg) { | |
// create all requesters and kvstore instances | |
for (unsigned i = 0; i < num_requesters; i++) { | |
requesters.emplace_back(this->spawn<Requester>()); | |
} | |
for (unsigned i = 0; i < num_kvstores; i++) { | |
kvstores.emplace_back(this->spawn<KVStore>()); | |
} | |
// make things easy, wait for other worker peers to start | |
this->delayed_send(this, 5s, start::value); | |
} | |
behavior make_behavior() override { | |
return { | |
// sent by myself to ask me to work | |
[=](start) { | |
auto &mm = this->system().middleman(); | |
unsigned self_index = ~0u; | |
// find all other worker peers | |
for (unsigned i = 0; i < worker_hosts.size(); i++) { | |
string &w = worker_hosts[i]; | |
if (w != self_host) { | |
auto expectation = mm.remote_actor<actor>(w, port); | |
assert(!!expectation && "Cannot connect to peers"); | |
workers.push_back(expectation.value()); | |
} else { | |
// current sender is me | |
self_index = i; | |
workers.push_back(actor_cast<actor>(this->current_sender())); | |
} | |
} | |
assert(self_index != ~0u); | |
// tell all of them my kvstore instances | |
for (const auto &w : workers) { | |
this->send(w, kvstore::value, self_index, kvstores); | |
} | |
}, | |
// receive kvstore instances from worker peers | |
[=](kvstore, unsigned w_idx, const vector <actor> &kvstores) { | |
if (global_kvstores.empty()) { | |
global_kvstores.resize(num_kvstores * worker_hosts.size()); | |
} | |
copy(kvstores.begin(), kvstores.end(), global_kvstores.begin() + w_idx * num_kvstores); | |
if (++num_recv == worker_hosts.size()) { | |
for (const auto &a : requesters) { | |
this->send(a, start::value); | |
} | |
} | |
} | |
}; | |
} | |
}; | |
int main(int argc, char **argv) { | |
self_host = string(argv[1]); | |
// worker21, worker22, ..., worker30 | |
for (unsigned i = 21; i <= 30; i++) { | |
worker_hosts.push_back(string("worker") + to_string(i)); | |
} | |
actor_system_config config; | |
config.load<io::middleman>(); | |
actor_system system{config}; | |
actor worker = system.spawn<Worker>(); | |
auto expectation = system.middleman().publish(worker, port); | |
assert(!!expectation && "Worker cannot bind to the specified port"); | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <chrono> | |
#include <functional> | |
#include <thread> | |
using namespace std; | |
// The StatisticMonitor print the returned value of the `Watcher` function every 1 second. | |
template<typename I> | |
class StatisticMonitor { | |
public: | |
typedef function<I()> Watcher; | |
StatisticMonitor &watch(Watcher tput) { | |
this->current_value = tput; | |
return *this; | |
} | |
StatisticMonitor &set_name(string name) { | |
this->name = move(name); | |
return *this; | |
} | |
void launch() { | |
if (timer != nullptr) { | |
return; | |
} | |
// create a separated thread to do the job | |
timer = new thread([=]() { | |
while (non_terminated) { | |
cout << name << ": " << current_value() << endl; | |
this_thread::sleep_for(chrono::milliseconds(1000)); | |
} | |
cout << name << " now terminates. " << endl; | |
}); | |
} | |
void terminate() { | |
if (timer != nullptr) { | |
non_terminated = false; | |
timer->join(); | |
delete timer; | |
timer = nullptr; | |
} | |
} | |
private: | |
bool non_terminated = true; | |
thread *timer = nullptr; | |
string name; | |
Watcher current_value; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment