Skip to content

Instantly share code, notes, and snippets.

@rvncerr
Created March 25, 2016 13:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rvncerr/3b4abf65e07794ca3345 to your computer and use it in GitHub Desktop.
Save rvncerr/3b4abf65e07794ca3345 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <sys/time.h>
#include <tarantool/tarantool.h>
#include <tarantool/tnt_net.h>
#include <tarantool/tnt_opt.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <string.h>
#include <iostream>
#include <list>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
uint64_t A = 0;
uint64_t B = 0;
uint64_t E = 0;
class NoSQLConnection {
protected:
std::list<unsigned int> InputQueue, OutputQueue;
std::mutex InputMutex, NotifyMutex, OutputMutex;
std::condition_variable InputCond, NotifyCond, OutputCond;
std::thread SendThread, ReceiveThread, MonitorThread;
uint64_t in_push, in_pop, in_pop_old, out_push, out_pop;
public:
virtual void Send(const std::list<unsigned int> Value) = 0;
virtual void Receive(std::list<unsigned int> &Value) = 0;
void SendThreadFunc() {
while(true) {
std::list<unsigned int> TempQueue;
{
std::unique_lock<std::mutex> Lock(OutputMutex);
while(OutputQueue.empty()) OutputCond.wait(Lock);
TempQueue.swap(OutputQueue);
}
Send(TempQueue);
out_pop += TempQueue.size();
NotifyCond.notify_all();
}
}
void ReceiveThreadFunc() {
while(true) {
std::list<unsigned int> TempQueue;
Receive(TempQueue);
if(!TempQueue.empty()) {
std::lock_guard<std::mutex> Lock(InputMutex);
InputQueue.insert(InputQueue.end(), TempQueue.begin(),
TempQueue.end());
in_push += TempQueue.size();
InputCond.notify_all();
} else {
std::unique_lock<std::mutex> Lock(NotifyMutex);
NotifyCond.wait_for(Lock, std::chrono::milliseconds(1));
}
}
}
void MonitorThreadFunc() {
uint64_t t = 0;
uint64_t s = 0;
while(true) {
sleep(1);
// std::cout << t << ":\trps = " << in_pop - in_pop_old;
// std::cout << "\tavg = " << in_pop/(t+1);
//std::cout << "\t" << out_push << " -->[ OUT ]--> " << out_pop << ", " << in_push << " -->[ IN ]--> " << in_pop <<
// " A = " << A << " B = " << B << " E = " << E << std::endl;
if(t == 19) {
std::cout << in_pop/(t+1) << std::endl;
exit(0);
}
in_pop_old = in_pop;
t++;
}
}
void DoSyncQuery(const unsigned int Value, unsigned int &Result) {
{
std::lock_guard<std::mutex> Lock(OutputMutex);
OutputQueue.push_back(Value);
out_push++;
OutputCond.notify_all();
}
{
std::unique_lock<std::mutex> Lock(InputMutex);
while(InputQueue.empty()) InputCond.wait(Lock);
Result = InputQueue.front();
InputQueue.pop_front();
in_pop++;
}
}
};
class TarantoolConnection: public NoSQLConnection {
private:
struct tnt_stream *Stream, *Tuple;
struct tnt_reply Reply;
public:
virtual void Send(const std::list<unsigned int> Value) {
for(auto Iter = Value.begin(); Iter != Value.end(); ++Iter) {
tnt_object_add_array(Tuple, 2);
tnt_object_add_int(Tuple, *Iter);
tnt_object_add_int(Tuple, *Iter);
tnt_replace(Stream, 512, Tuple);
//tnt_object_add_array(Tuple, 1);
//tnt_object_add_int(Tuple, *Iter);
//tnt_select(Stream, 512, 0, UINT32_MAX, 0, 0, Tuple);
tnt_object_reset(Tuple);
}
tnt_flush(Stream);
}
virtual void Receive(std::list<unsigned int> &Value) {
while(true) {
int r = Stream->read_reply(Stream, &Reply);
if(r == 0) Value.push_back(0); else break;
}
}
TarantoolConnection(const char *ConnectionString) {
in_push = in_pop = in_pop_old = out_push = out_pop = 0;
Stream = tnt_net(NULL);
Tuple = tnt_object(NULL);
tnt_reply_init(&Reply);
tnt_set(Stream, TNT_OPT_URI, ConnectionString);
if(tnt_connect(Stream) < 0) {
std::cerr << "Connection refused on '"
<< ConnectionString << "'." << std::endl;
}
SendThread = std::thread([=]{SendThreadFunc();});
ReceiveThread = std::thread([=]{ReceiveThreadFunc();});
MonitorThread = std::thread([=]{MonitorThreadFunc();});
}
};
class RedisConnection: public NoSQLConnection {
private:
int fd;
public:
virtual void Send(const std::list<unsigned int> Value) {
static char Query[16*1024*1024];
char *Q = Query;
for(auto Iter = Value.begin(); Iter != Value.end(); ++Iter) {
char Number[16];
sprintf(Number, "%u", *Iter);
unsigned int NumberLen = strlen(Number);
//sprintf(Q, "*3\r\n$3\r\nSET\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n", NumberLen, Number, NumberLen, Number);
sprintf(Q, "*2\r\n$3\r\nGET\r\n$%u\r\n%s\r\n", NumberLen, Number);
Q = Q + strlen(Q);
}
send(fd, Query, strlen(Query), MSG_NOSIGNAL);
}
virtual void Receive(std::list<unsigned int> &Value) {
int Count; static char Buffer[16*1024*1024];
if((Count = recv(fd, Buffer, 16*1024*1024, MSG_NOSIGNAL)) > 0) {
for(unsigned int i = 0; i < Count; i++) {
//if(Buffer[i] == '+') {
if(Buffer[i] == '$') {
Value.push_back(0);
}
}
} else if(Count == -1) {
if(errno != EAGAIN) {
std::cerr << strerror(errno) << std::endl;
exit(1);
}
}
}
RedisConnection(const char *ip, const unsigned int port, const char *auth) {
in_push = in_pop = in_pop_old = out_push = out_pop = 0;
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
struct sockaddr_in sa;
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
sa.sin_addr.s_addr = inet_addr(ip);
if(connect(fd, (struct sockaddr *)(&sa), sizeof(sa)) == -1) {
std::cerr << "Connection refused on '"
<< ip << ':' << port << "'." << std::endl;
}
char AuthString[1024];
sprintf(AuthString, "AUTH %s\r\n", auth);
send(fd, AuthString, strlen(AuthString), MSG_NOSIGNAL);
recv(fd, AuthString, 1, MSG_NOSIGNAL);
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
SendThread = std::thread([=]{SendThreadFunc();});
ReceiveThread = std::thread([=]{ReceiveThreadFunc();});
MonitorThread = std::thread([=]{MonitorThreadFunc();});
}
};
uint32_t ooo = 0;
// Memcached protocol.
typedef struct memcached_header_s {
uint8_t magic;
uint8_t opcode;
uint16_t key_length;
uint8_t extras_length;
uint8_t data_type;
union { uint16_t vbucket_id; uint16_t status; };
uint32_t total_body_length;
uint32_t opaque;
uint64_t cas;
} __attribute__((packed)) memcached_header_t;
typedef struct memcached_set_request_s {
memcached_header_t header;
struct { uint32_t flags; uint32_t expiration; } __attribute__((packed)) extras;
} __attribute__((packed)) memcached_set_request_t;
typedef struct memcached_get_request_s {
memcached_header_t header;
} __attribute__((packed)) memcached_get_request_t;
#define MEMCACHED_FILL_SET_REQUEST(r, key_size, value_size) \
r->header.magic = 0x80; \
r->header.opcode = 0x01; \
r->header.key_length = htons(key_size); \
r->header.extras_length = 0x08; \
r->header.vbucket_id = 0x0000; \
r->header.total_body_length = htonl(0x08 + key_size + value_size); \
r->header.opaque = htonl(ooo++); \
r->header.cas = 0x0000000000000000; \
r->extras.flags = 0x00000000; \
r->extras.expiration = 0x00000000;
#define MEMCACHED_FILL_GET_REQUEST(r, key_size) \
r->header.magic = 0x80; \
r->header.opcode = 0x00; \
r->header.key_length = htons(key_size); \
r->header.extras_length = 0x00; \
r->header.vbucket_id = 0x0000; \
r->header.total_body_length = htonl(key_size); \
r->header.opaque = htonl(ooo++); \
r->header.cas = 0x0000000000000000;
class MemcachedConnection: public NoSQLConnection {
private:
int fd;
public:
virtual void Send(const std::list<unsigned int> Value) {
static char Query[16*1024*1024];
char *Q = Query;
for(auto Iter = Value.begin(); Iter != Value.end(); ++Iter) {
char Number[16];
sprintf(Number, "%u", *Iter);
unsigned int NumberLen = strlen(Number);
/*MEMCACHED_FILL_SET_REQUEST(((memcached_set_request_t*)Q), NumberLen, NumberLen);
Q = Q + sizeof(memcached_set_request_t);
memcpy(Q, Number, NumberLen);
Q = Q + NumberLen;
memcpy(Q, Number, NumberLen);
Q = Q + NumberLen;*/
MEMCACHED_FILL_GET_REQUEST(((memcached_get_request_t*)Q), NumberLen);
Q = Q + sizeof(memcached_get_request_t);
memcpy(Q, Number, NumberLen);
Q = Q + NumberLen;
}
send(fd, Query, Q - Query, MSG_NOSIGNAL);
A += Value.size();
}
virtual void Receive(std::list<unsigned int> &Value) {
int Count;
static char Buffer[16*1024*1024];
char *Begin = Buffer;
char *End = Buffer;
if((Count = recv(fd, End, 16*1024*1024, MSG_NOSIGNAL)) > 0) {
End += Count;
while(true) {
memcached_header_t *Helper = (memcached_header_t *)Begin;
if(End - Begin >= sizeof(memcached_header_t)) {
if(End - Begin >= sizeof(memcached_header_t) + ntohl(Helper->total_body_length)) {
Begin = Begin + sizeof(memcached_header_t) + ntohl(Helper->total_body_length);
Value.push_back(0);
B++;
} else break;
} else break;
}
memmove(Buffer, Begin, End-Begin);
End = Buffer + (End - Begin);
Begin = Buffer;
}
}
MemcachedConnection(const char *ip, const unsigned int port) {
in_push = in_pop = in_pop_old = out_push = out_pop = 0;
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
struct sockaddr_in sa;
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
sa.sin_addr.s_addr = inet_addr(ip);
if(connect(fd, (struct sockaddr *)(&sa), sizeof(sa)) == -1) {
std::cerr << "Connection refused on '"
<< ip << ':' << port << "'." << std::endl;
}
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
SendThread = std::thread([=]{SendThreadFunc();});
ReceiveThread = std::thread([=]{ReceiveThreadFunc();});
MonitorThread = std::thread([=]{MonitorThreadFunc();});
}
};
void ClientFunc(NoSQLConnection *Connection) {
for(unsigned int i = 0; i < 1000000; ++i) {
unsigned int r;
Connection->DoSyncQuery(i, r);
}
}
int main(int argc, char **argv) {
unsigned int N = 1024;
sscanf(argv[1], "%u", &N);
//NoSQLConnection *Connection = new TarantoolConnection("13.91.106.28:3301");
NoSQLConnection *Connection = new RedisConnection("13.93.220.164", 6379, "W+KzXgUTUnAo6th95SxE6plyHt3Snb8l4CU8tw3PKco=");
//NoSQLConnection *Connection = new MemcachedConnection("13.93.213.230", 11211);
std::thread **Clients = new std::thread* [N];
for(unsigned int i = 0; i < N; i++) Clients[i] = new std::thread(ClientFunc, Connection);
for(unsigned int i = 0; i < N; i++) Clients[i]->join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment