Created
April 24, 2010 10:18
-
-
Save kumagi/377575 to your computer and use it in GitHub Desktop.
msgpack KVSserver sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdint.h> | |
#include <stdlib.h> | |
#include <msgpack.hpp> | |
#include <mp/wavy.h> | |
#include <fcntl.h> | |
#include <errno.h> | |
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <arpa/inet.h> | |
#include <netinet/in.h> | |
#include <netinet/tcp.h> | |
#include <iostream> | |
#include <string> | |
#include <map> | |
enum op{ | |
SET, | |
GET, | |
DELETE, | |
FOUND, | |
DONE, | |
NONE, | |
}; | |
std::map<std::string,std::string> kvs; | |
char color[10][100] = {"\x1b[49m",// black | |
"\x1b[31m",// red | |
"\x1b[33m",// green | |
"\x1b[34m",// yellow | |
"\x1b[35m",// blue | |
"\x1b[36m",// magenta | |
}; | |
char* idcolor(int identifier){ | |
return color[identifier%6]; | |
} | |
class handler : public mp::wavy::handler { | |
public: | |
handler(int fd) : | |
mp::wavy::handler(fd), | |
m_pac(64*1024) { } | |
void on_message(int fd, msgpack::object obj, msgpack::zone* z,int id) | |
{ | |
using namespace std; | |
std::cerr << idcolor(id) << "arrive:" << obj << std::endl; | |
msgpack::type::tuple<int,std::string> out(obj); | |
int operation = out.get<0>(); | |
string key = out.get<1>(); | |
switch(operation){ | |
case SET:{ | |
msgpack::type::tuple<int,std::string,std::string> out(obj); | |
string value = out.get<2>(); | |
kvs.insert(pair<string, string>(key,value)); | |
msgpack::sbuffer sbuf; | |
int operation = DONE; | |
msgpack::pack(sbuf, operation); | |
write(fd, sbuf.data(), sbuf.size()); | |
fprintf(stderr,"%sset %s,%s stored ",idcolor(id),key.c_str(),value.c_str()); | |
break; | |
} | |
case GET:{ | |
map<string,string>::iterator it = kvs.find(key); | |
if(it != kvs.end()){ | |
msgpack::vrefbuffer vbuf; | |
msgpack::pack(vbuf, it->second); | |
int operation = FOUND; | |
msgpack::pack(vbuf, operation); | |
const struct iovec* iov = vbuf.vector(); | |
writev(fd, iov, vbuf.vector_size()); | |
fprintf(stderr,"get %s->%s\n",key.c_str(),it->second.c_str()); | |
}else{ | |
msgpack::sbuffer sbuf; | |
int operation = NONE; | |
msgpack::pack(sbuf, operation); | |
write(fd, sbuf.data(), sbuf.size()); | |
fprintf(stderr,"get %s->[no]\n",key.c_str()); | |
} | |
break; | |
} | |
case DELETE:{ | |
msgpack::sbuffer sbuf; | |
map<string,string>::iterator it = kvs.find(key); | |
if(it != kvs.end()){ | |
kvs.erase(it); | |
int operation = DONE; | |
msgpack::pack(sbuf, operation); | |
write(fd, sbuf.data(), sbuf.size()); | |
fprintf(stderr,"delete %s->%s\n",key.c_str(),it->second.c_str()); | |
}else{ | |
int operation = NONE; | |
msgpack::pack(sbuf, operation); | |
write(fd, sbuf.data(), sbuf.size()); | |
fprintf(stderr,"delete %s->[no]\n",key.c_str()); | |
} | |
break; | |
} | |
} | |
} | |
void on_read(mp::wavy::event& e) | |
{ | |
int id = rand() % 1000; | |
fprintf(stderr,"%sread %d start{ ",idcolor(id),id); | |
try{ | |
while(true) { | |
if(m_pac.execute()) { | |
msgpack::object msg = m_pac.data(); | |
std::auto_ptr<msgpack::zone> z( m_pac.release_zone() ); | |
m_pac.reset(); | |
e.more(); //e.next(); | |
fprintf(stderr,"%spassing %d ",idcolor(id),id); | |
//std::cout << "object received: " << msg << std::endl; | |
on_message(fd(), msg, &*z,id); | |
fprintf(stderr,"%s}%d\n",idcolor(id),id); | |
return; | |
} | |
m_pac.reserve_buffer(8*1024); | |
int read_len = ::read(fd(), m_pac.buffer(), m_pac.buffer_capacity()); | |
if(read_len <= 0) { | |
if(read_len == 0) { perror("closed"); throw mp::system_error(errno, "connection closed"); } | |
if(errno == EAGAIN || errno == EINTR) { fprintf(stderr,"%s}%d\n",idcolor(id),id); return; } | |
else { perror("read"); throw mp::system_error(errno, "read error"); } | |
} | |
m_pac.buffer_consumed(read_len); | |
} | |
}catch(...){ | |
perror("exception "); | |
fprintf(stderr,"catched closed! %d\n",id); | |
::close(fd()); | |
} | |
} | |
private: | |
msgpack::unpacker m_pac; | |
}; | |
void on_accepted(int fd, int err, mp::wavy::loop* lo) | |
{ | |
fprintf(stderr,"accept %d %d\n",fd,err); | |
if(fd < 0) { | |
perror("accept error"); | |
exit(1); | |
} | |
try { | |
int on = 1; | |
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); | |
lo->add_handler<handler>(fd); | |
} catch (...) { | |
fprintf(stderr,"listening socket error"); | |
::close(fd); | |
return; | |
} | |
} | |
int main(void) | |
{ | |
using namespace mp::placeholders; | |
mp::wavy::loop lo; | |
struct sockaddr_in addr; | |
memset(&addr, 0, sizeof(addr)); | |
addr.sin_port = htons(9090); | |
addr.sin_family = AF_INET; | |
addr.sin_addr.s_addr = htonl(INADDR_ANY); | |
lo.listen(PF_INET, SOCK_STREAM, 0, | |
(struct sockaddr*)&addr, sizeof(addr), | |
mp::bind(&on_accepted, _1, _2, &lo)); | |
lo.run(2); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment