Skip to content

Instantly share code, notes, and snippets.

@ochinchina
Last active August 29, 2015 14:02
Show Gist options
  • Save ochinchina/141f4731bf09a739718b to your computer and use it in GitHub Desktop.
Save ochinchina/141f4731bf09a739718b to your computer and use it in GitHub Desktop.
implement std::streambuf with redis as storage. This piece code depends on the redis c client library hiredis
#include <hiredis.h>
#include <streambuf>
#include <iostream>
#include <cstring>
class redis_streambuf: public std::streambuf {
public:
explicit redis_streambuf( redisContext* context, const char* key, std::size_t buf_size = 2048 )
:context_( context ),
key_( key ),
buf_size_( buf_size ),
buf_( new char[buf_size] ),
redis_str_pos_( 0 )
{
setg( buf_, buf_, buf_ );
setp( buf_, buf_ + buf_size_ );
}
~redis_streambuf() {
redis_flush();
delete []buf_;
}
private:
/**
*@see std::streambuf#underflow()
*/
virtual int_type underflow() {
if( gptr() < egptr() ) { //if still have char in buffer
return traits_type::to_int_type(*gptr());
}
redisReply *reply = (redisReply*)redisCommand( context_, "GETRANGE %b %d %d", key_.data(), key_.length(), redis_str_pos_, redis_str_pos_ + buf_size_ );
if( reply ) {
if( reply->type == REDIS_REPLY_STRING ) {
memcpy( buf_, reply->str, reply->len );
setg( buf_, buf_, buf_ + reply->len );
redis_str_pos_ += reply->len;
} else {
setg( buf_, buf_, buf_ );
}
freeReplyObject(reply);
} else {
setg( buf_, buf_, buf_ );
}
if( gptr() < egptr() ) {
return traits_type::to_int_type(*gptr());
} else {
return traits_type::eof();
}
}
virtual std::streamsize xsputn( const char* buf, std::streamsize n ) {
std::streamsize i = 0;
while( i < n ) {
std::streamsize t = epptr() - pptr();
if( t > ( n - i ) ) {
t = n - i;
}
memcpy( pptr(), &buf[i], t );
pbump( t );
redis_flush();
setp( buf_, buf_ + buf_size_ );
i += t;
}
return n;
}
virtual int overflow (int c ) {
redis_flush();
setp( buf_, buf_ + buf_size_ );
if( c != traits_type::eof() ) {
*pptr() = (char)c;
pbump( 1 );
}
return c;
}
virtual int sync() {
redis_flush();
setp( buf_, buf_ + buf_size_ );
}
long long redis_flush() {
redisReply *reply = (redisReply*)redisCommand( context_, "APPEND %b %b", key_.data(), key_.length(), pbase(), pptr() - pbase() );
if( reply ) {
if( reply->type == REDIS_REPLY_INTEGER ) {
return reply->integer;
}
freeReplyObject( reply );
}
return 0;
}
private:
redisContext* context_;
std::string key_;
std::size_t buf_size_;
char* buf_;
int redis_str_pos_;
};
#include "redis_streambuf.hpp"
#include <stdlib.h>
int main( int argc, char** argv ) {
const char *hostname = (argc > 1) ? argv[1] : "127.0.0.1";
int port = (argc > 2) ? atoi(argv[2]) : 6379;
const char* key = (argc > 3 ) ? argv[3]: "test";
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
redisContext *context = redisConnectWithTimeout(hostname, port, timeout);
std::ostream out( new redis_streambuf( context, key ) );
for( int i = 0; i < 1000; i ++ ) {
out << "hello";
}
std::istream in( new redis_streambuf( context, key, 10 ) );
while( ! in.eof() ) {
std::cout << (char)in.get();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment