Skip to content

Instantly share code, notes, and snippets.

@kumagi
Created April 24, 2010 10:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kumagi/377575 to your computer and use it in GitHub Desktop.
Save kumagi/377575 to your computer and use it in GitHub Desktop.
msgpack KVSserver sample
#include <stdint.h>
#include <stdlib.h>
#include <msgpack.hpp>
#include <mp/wavy.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <iostream>
#include <string>
#include <map>
enum op{
SET,
GET,
DELETE,
FOUND,
DONE,
NONE,
};
std::map<std::string,std::string> kvs;
char color[10][100] = {"\x1b[49m",// black
"\x1b[31m",// red
"\x1b[33m",// green
"\x1b[34m",// yellow
"\x1b[35m",// blue
"\x1b[36m",// magenta
};
char* idcolor(int identifier){
return color[identifier%6];
}
class handler : public mp::wavy::handler {
public:
handler(int fd) :
mp::wavy::handler(fd),
m_pac(64*1024) { }
void on_message(int fd, msgpack::object obj, msgpack::zone* z,int id)
{
using namespace std;
std::cerr << idcolor(id) << "arrive:" << obj << std::endl;
msgpack::type::tuple<int,std::string> out(obj);
int operation = out.get<0>();
string key = out.get<1>();
switch(operation){
case SET:{
msgpack::type::tuple<int,std::string,std::string> out(obj);
string value = out.get<2>();
kvs.insert(pair<string, string>(key,value));
msgpack::sbuffer sbuf;
int operation = DONE;
msgpack::pack(sbuf, operation);
write(fd, sbuf.data(), sbuf.size());
fprintf(stderr,"%sset %s,%s stored ",idcolor(id),key.c_str(),value.c_str());
break;
}
case GET:{
map<string,string>::iterator it = kvs.find(key);
if(it != kvs.end()){
msgpack::vrefbuffer vbuf;
msgpack::pack(vbuf, it->second);
int operation = FOUND;
msgpack::pack(vbuf, operation);
const struct iovec* iov = vbuf.vector();
writev(fd, iov, vbuf.vector_size());
fprintf(stderr,"get %s->%s\n",key.c_str(),it->second.c_str());
}else{
msgpack::sbuffer sbuf;
int operation = NONE;
msgpack::pack(sbuf, operation);
write(fd, sbuf.data(), sbuf.size());
fprintf(stderr,"get %s->[no]\n",key.c_str());
}
break;
}
case DELETE:{
msgpack::sbuffer sbuf;
map<string,string>::iterator it = kvs.find(key);
if(it != kvs.end()){
kvs.erase(it);
int operation = DONE;
msgpack::pack(sbuf, operation);
write(fd, sbuf.data(), sbuf.size());
fprintf(stderr,"delete %s->%s\n",key.c_str(),it->second.c_str());
}else{
int operation = NONE;
msgpack::pack(sbuf, operation);
write(fd, sbuf.data(), sbuf.size());
fprintf(stderr,"delete %s->[no]\n",key.c_str());
}
break;
}
}
}
void on_read(mp::wavy::event& e)
{
int id = rand() % 1000;
fprintf(stderr,"%sread %d start{ ",idcolor(id),id);
try{
while(true) {
if(m_pac.execute()) {
msgpack::object msg = m_pac.data();
std::auto_ptr<msgpack::zone> z( m_pac.release_zone() );
m_pac.reset();
e.more(); //e.next();
fprintf(stderr,"%spassing %d ",idcolor(id),id);
//std::cout << "object received: " << msg << std::endl;
on_message(fd(), msg, &*z,id);
fprintf(stderr,"%s}%d\n",idcolor(id),id);
return;
}
m_pac.reserve_buffer(8*1024);
int read_len = ::read(fd(), m_pac.buffer(), m_pac.buffer_capacity());
if(read_len <= 0) {
if(read_len == 0) { perror("closed"); throw mp::system_error(errno, "connection closed"); }
if(errno == EAGAIN || errno == EINTR) { fprintf(stderr,"%s}%d\n",idcolor(id),id); return; }
else { perror("read"); throw mp::system_error(errno, "read error"); }
}
m_pac.buffer_consumed(read_len);
}
}catch(...){
perror("exception ");
fprintf(stderr,"catched closed! %d\n",id);
::close(fd());
}
}
private:
msgpack::unpacker m_pac;
};
void on_accepted(int fd, int err, mp::wavy::loop* lo)
{
fprintf(stderr,"accept %d %d\n",fd,err);
if(fd < 0) {
perror("accept error");
exit(1);
}
try {
int on = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
lo->add_handler<handler>(fd);
} catch (...) {
fprintf(stderr,"listening socket error");
::close(fd);
return;
}
}
int main(void)
{
using namespace mp::placeholders;
mp::wavy::loop lo;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_port = htons(9090);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
lo.listen(PF_INET, SOCK_STREAM, 0,
(struct sockaddr*)&addr, sizeof(addr),
mp::bind(&on_accepted, _1, _2, &lo));
lo.run(2);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment