Skip to content

Instantly share code, notes, and snippets.

@kumagi
Created April 24, 2010 10:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kumagi/377572 to your computer and use it in GitHub Desktop.
Save kumagi/377572 to your computer and use it in GitHub Desktop.
msgpack KVSclient sample
#include <stdint.h>
#include <msgpack.hpp>
#include <mp/wavy.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <iostream>
#include <string>
#include <vector>
enum op{
SET,
GET,
DELETE,
FOUND,
DONE,
NONE,
};
mp::wavy::loop* gLoop;
class handler : public mp::wavy::handler{
private:
msgpack::unpacker m_pac;
public:
handler(int fd):
mp::wavy::handler(fd),
m_pac(64*1024){}
void on_message(int fd, msgpack::object obj, msgpack::zone* z){
//msgpack::type::tuple<int> out(obj);
int operation(obj.convert());
switch(operation){
case FOUND:{
msgpack::type::tuple<int,std::string> out(obj);
std::string result = out.get<1>();
fprintf(stderr,"found %s\n",result.c_str());
break;
}
case NONE:
fprintf(stderr,"not found \n");
break;
case DONE:
fprintf(stderr,"ok ");
break;
default:
fprintf(stderr,"illigal message\n");
}
}
void on_read(mp::wavy::event& e){
while(true) {
m_pac.reserve_buffer(8*1024);
int read_len = ::read(fd(), m_pac.buffer(), m_pac.buffer_capacity());
if(read_len <= 0) {
if(read_len == 0) { throw mp::system_error(errno, "connection closed"); }
if(errno == EAGAIN || errno == EINTR) { return; }
else { throw mp::system_error(errno, "read error"); }
}
m_pac.buffer_consumed(read_len);
while(m_pac.execute()) {
msgpack::object msg = m_pac.data();
std::auto_ptr<msgpack::zone> z( m_pac.release_zone() );
m_pac.reset();
e.more(); // e.next();
//std::cout << "object received: " << msg;
on_message(fd(), msg, &*z);
}
}
}
};
void connected(int fd, int err)
{
if(fd < 0) {
errno = err;
perror("connect error");
return;
}
gLoop->add_handler<handler>(fd);
int cnt = 0;
while(1){
try {
fprintf(stderr,"send:k%d ",cnt);
char key[10];
sprintf(key,"k%d",cnt);
msgpack::type::tuple<int,std::string,std::string> kvp(SET,key,"bar");
msgpack::vrefbuffer vbuf;
msgpack::pack(vbuf,kvp);
const struct iovec* iov = vbuf.vector();
int result = writev(fd, iov, vbuf.vector_size());
if(result < 1){
perror("write");
if(errno == EAGAIN || errno == EINTR){
//usleep(10);
continue;
}
break;
}
cnt++;
} catch (...) {
perror("catched write");
}
}
::close(fd);
exit(0);
}
int main(void)
{
using namespace mp::placeholders;
mp::wavy::loop lo;
gLoop = &lo;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_port = htons(9090);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
lo.connect(PF_INET, SOCK_STREAM, 0,
(struct sockaddr*)&addr, sizeof(addr),
0.0, connected);
lo.run(2);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment