Skip to content

Instantly share code, notes, and snippets.

@ochinchina
Created July 14, 2014 02:05
Show Gist options
  • Save ochinchina/4e02eb2c6956c555a2ad to your computer and use it in GitHub Desktop.
Save ochinchina/4e02eb2c6956c555a2ad to your computer and use it in GitHub Desktop.
The zookeeper queue implementation using C API
#include <zookeeper.h>
#include <iostream>
#include <string>
#include <unistd.h>
#include <mutex>
#include <condition_variable>
#include <sstream>
class ZkQueue {
public:
ZkQueue( const std::string& host,
const std::string& root )
:host_( host ),
root_( root ),
connected_( false )
{
handle_ = zookeeper_init( host_.c_str(), watch, 5*1000, 0, this, 0 );
{
std::unique_lock<std::mutex> lock(mutex_);
while( !connected_) {
cond_.wait( lock );
}
}
Stat stat;
if( zoo_exists( handle_, root_.c_str(), false, &stat ) == ZOK ) {
std::cout << root_ << " already exists" << std::endl;
} else {
std::cout << root_ << " does not exist" << std::endl;
if( zoo_create( handle_, root_.c_str(), "simple test", 11, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0 ) == ZOK ) {
std::cout << "create " << root_ << " successfully" << std::endl;
}
}
}
~ZkQueue() {
zookeeper_close( handle_ );
}
void produce( const std::string& msg ) {
std::string path = root_ + "/element";
if( zoo_create( handle_, path.c_str(), msg.c_str(), msg.length(), &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, 0, 0 ) == ZOK ) {
std::cout << "create " << path << " successfully" << std::endl;
}
}
std::string consume() {
for( ; ; ) {
std::unique_lock<std::mutex> lock(mutex_);
String_vector children;
if( zoo_get_children( handle_, root_.c_str(), true, &children ) == ZOK ) {
if( children.count > 0 ) {
int min_seq = atoi( &children.data[0][7] );
int j = 0;
for( int i = 1; i < children.count; i++ ) {
int tmp_seq = atoi( &children.data[i][7] );
if( min_seq > tmp_seq ) {
min_seq = tmp_seq;
j = i;
}
}
std::string delete_path = root_ + "/" + children.data[j];
struct Stat stat;
std::cout << delete_path << std::endl;
char* buf = new char[1024];
int buf_len = 1024;
int data_len = buf_len;
if( zoo_get( handle_, delete_path.c_str(), false, buf, &data_len, &stat ) == ZOK ) {
std::string s( buf, data_len );
delete []buf;
zoo_delete( handle_, delete_path.c_str(), -1 );
return s;
} else {
delete []buf;
std::cout << "fail to get the data of " << delete_path << std::endl;
}
}
}
cond_.wait( lock );
}
}
private:
static void watch(zhandle_t *zh, int type, int state, const char *path,void *watcherCtx ) {
ZkQueue* queue = (ZkQueue*)watcherCtx;
if( state == ZOO_CONNECTED_STATE ) {
if( !queue->connected_ ) {
{
std::unique_lock<std::mutex> lock(queue->mutex_);
queue->connected_ = true;
}
queue->cond_.notify_one();
}
}
}
private:
std::string host_;
std::string root_;
zhandle_t* handle_;
bool connected_;
std::mutex mutex_;
std::condition_variable cond_;
};
int main( int argc, char** argv ) {
ZkQueue queue("127.0.0.1:2181", "/root" );
for( int i = 0; i < 10; i++ ) {
std::ostringstream out;
out << "this is test data " << i;
queue.produce( out.str() );
}
for( ; ; ) {
std::cout << queue.consume() << std::endl;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment