Skip to content

Instantly share code, notes, and snippets.

@zzxx-husky
Last active December 18, 2018 08:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zzxx-husky/e1edafa4522dc80558a0288bb208cea7 to your computer and use it in GitHub Desktop.
Save zzxx-husky/e1edafa4522dc80558a0288bb208cea7 to your computer and use it in GitHub Desktop.
Minor Update
#include "statistic_monitor.hpp"
#include "cassert"
#include "caf/all.hpp"
#include "caf/io/all.hpp"
using namespace caf;
using namespace chrono;
using namespace literals;
string self_host;
vector <string> worker_hosts;
vector <actor> global_kvstores;
// each machine has one worker and all workers use the same port
unsigned port = 22346;
unsigned num_requesters = 20;
unsigned num_kvstores = 10;
using start = atom_constant<atom("Start")>;
using req = atom_constant<atom("Req")>;
using rep = atom_constant<atom("Rep")>;
using kvstore = atom_constant<atom("KVStore")>;
struct KVStore : public event_based_actor {
unsigned num = 0;
StatisticMonitor<unsigned> mon_num; // monitor request receiving tput
KVStore(actor_config &cfg) : event_based_actor(cfg) {
mon_num.set_name("num_kv").watch([=, old = 0u]() mutable {
unsigned snapshot = this->num;
unsigned diff = snapshot - old;
old = snapshot;
return diff;
}).launch();
}
behavior make_behavior() override {
return {
[=](req, string key) {
++this->num;
auto r = actor_cast<actor>(this->current_sender());
this->send<message_priority::high>(r, rep::value, key);
}
};
}
};
struct Requester : public event_based_actor {
hash <string> hasher{};
unsigned num_rep = 0, num_req = 0;
StatisticMonitor<unsigned> mon_rep, mon_req;
Requester(actor_config &cfg) : event_based_actor(cfg) {}
behavior make_behavior() override {
return {
[=](start) {
// launch the dispatcher
auto d = this->spawn(Requester::dispatcher);
this->send(d, start::value);
mon_req.set_name("num_req").watch([=, old = 0u]() mutable {
unsigned snapshot = this->num_req;
unsigned diff = snapshot - old;
old = snapshot;
return diff;
}).launch();
mon_rep.set_name("num_rep").watch([=, old = 0u]() mutable {
unsigned snapshot = this->num_rep;
unsigned diff = snapshot - old;
old = snapshot;
return diff;
}).launch();
},
[=](req, const string &key) {
// receive one elem from dispatcher and make request to kvstore instance
++num_req;
this->send(global_kvstores[hasher(key) % global_kvstores.size()], req::value, key);
},
[=](rep, const string &reply) {
// get a reply from kvstore instances
++num_rep;
}
};
}
static void dispatcher(blocking_actor *self) {
self->receive([=](start) {
auto r = actor_cast<actor>(self->current_sender());
// 30000ms = 30s
string msg = "abcdefghijklmnopqrstuvwxyz0123456789";
for (unsigned i = 0; i < 30000; i++) {
// 10 per ms, 10000 per second
for (unsigned j = 0; j < 2; j++) {
self->send(r, req::value, msg);
this_thread::sleep_for(1ms);
}
}
});
}
};
struct Worker : public event_based_actor {
vector <actor> requesters;
vector <actor> kvstores;
vector <actor> workers;
unsigned num_recv = 0;
Worker(actor_config &cfg) : event_based_actor(cfg) {
// create all requesters and kvstore instances
for (unsigned i = 0; i < num_requesters; i++) {
requesters.emplace_back(this->spawn<Requester>());
}
for (unsigned i = 0; i < num_kvstores; i++) {
kvstores.emplace_back(this->spawn<KVStore>());
}
// make things easy, wait for other worker peers to start
this->delayed_send(this, 5s, start::value);
}
behavior make_behavior() override {
return {
// sent by myself to ask me to work
[=](start) {
auto &mm = this->system().middleman();
unsigned self_index = ~0u;
// find all other worker peers
for (unsigned i = 0; i < worker_hosts.size(); i++) {
string &w = worker_hosts[i];
if (w != self_host) {
auto expectation = mm.remote_actor<actor>(w, port);
assert(!!expectation && "Cannot connect to peers");
workers.push_back(expectation.value());
} else {
// current sender is me
self_index = i;
workers.push_back(actor_cast<actor>(this->current_sender()));
}
}
assert(self_index != ~0u);
// tell all of them my kvstore instances
for (const auto &w : workers) {
this->send(w, kvstore::value, self_index, kvstores);
}
},
// receive kvstore instances from worker peers
[=](kvstore, unsigned w_idx, const vector <actor> &kvstores) {
if (global_kvstores.empty()) {
global_kvstores.resize(num_kvstores * worker_hosts.size());
}
copy(kvstores.begin(), kvstores.end(), global_kvstores.begin() + w_idx * num_kvstores);
if (++num_recv == worker_hosts.size()) {
for (const auto &a : requesters) {
this->send(a, start::value);
}
}
}
};
}
};
int main(int argc, char **argv) {
self_host = string(argv[1]);
// worker21, worker22, ..., worker30
for (unsigned i = 21; i <= 30; i++) {
worker_hosts.push_back(string("worker") + to_string(i));
}
actor_system_config config;
config.load<io::middleman>();
actor_system system{config};
actor worker = system.spawn<Worker>();
auto expectation = system.middleman().publish(worker, port);
assert(!!expectation && "Worker cannot bind to the specified port");
}
#pragma once
#include <chrono>
#include <functional>
#include <thread>
using namespace std;
// The StatisticMonitor print the returned value of the `Watcher` function every 1 second.
template<typename I>
class StatisticMonitor {
public:
typedef function<I()> Watcher;
StatisticMonitor &watch(Watcher tput) {
this->current_value = tput;
return *this;
}
StatisticMonitor &set_name(string name) {
this->name = move(name);
return *this;
}
void launch() {
if (timer != nullptr) {
return;
}
// create a separated thread to do the job
timer = new thread([=]() {
while (non_terminated) {
cout << name << ": " << current_value() << endl;
this_thread::sleep_for(chrono::milliseconds(1000));
}
cout << name << " now terminates. " << endl;
});
}
void terminate() {
if (timer != nullptr) {
non_terminated = false;
timer->join();
delete timer;
timer = nullptr;
}
}
private:
bool non_terminated = true;
thread *timer = nullptr;
string name;
Watcher current_value;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment