-
-
Save rvncerr/3b4abf65e07794ca3345 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <time.h> | |
#include <sys/time.h> | |
#include <tarantool/tarantool.h> | |
#include <tarantool/tnt_net.h> | |
#include <tarantool/tnt_opt.h> | |
#include <pthread.h> | |
#include <errno.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <iostream> | |
#include <list> | |
#include <deque> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <netinet/in.h> | |
#include <arpa/inet.h> | |
#include <fcntl.h> | |
#include <errno.h> | |
uint64_t A = 0; | |
uint64_t B = 0; | |
uint64_t E = 0; | |
class NoSQLConnection { | |
protected: | |
std::list<unsigned int> InputQueue, OutputQueue; | |
std::mutex InputMutex, NotifyMutex, OutputMutex; | |
std::condition_variable InputCond, NotifyCond, OutputCond; | |
std::thread SendThread, ReceiveThread, MonitorThread; | |
uint64_t in_push, in_pop, in_pop_old, out_push, out_pop; | |
public: | |
virtual void Send(const std::list<unsigned int> Value) = 0; | |
virtual void Receive(std::list<unsigned int> &Value) = 0; | |
void SendThreadFunc() { | |
while(true) { | |
std::list<unsigned int> TempQueue; | |
{ | |
std::unique_lock<std::mutex> Lock(OutputMutex); | |
while(OutputQueue.empty()) OutputCond.wait(Lock); | |
TempQueue.swap(OutputQueue); | |
} | |
Send(TempQueue); | |
out_pop += TempQueue.size(); | |
NotifyCond.notify_all(); | |
} | |
} | |
void ReceiveThreadFunc() { | |
while(true) { | |
std::list<unsigned int> TempQueue; | |
Receive(TempQueue); | |
if(!TempQueue.empty()) { | |
std::lock_guard<std::mutex> Lock(InputMutex); | |
InputQueue.insert(InputQueue.end(), TempQueue.begin(), | |
TempQueue.end()); | |
in_push += TempQueue.size(); | |
InputCond.notify_all(); | |
} else { | |
std::unique_lock<std::mutex> Lock(NotifyMutex); | |
NotifyCond.wait_for(Lock, std::chrono::milliseconds(1)); | |
} | |
} | |
} | |
void MonitorThreadFunc() { | |
uint64_t t = 0; | |
uint64_t s = 0; | |
while(true) { | |
sleep(1); | |
// std::cout << t << ":\trps = " << in_pop - in_pop_old; | |
// std::cout << "\tavg = " << in_pop/(t+1); | |
//std::cout << "\t" << out_push << " -->[ OUT ]--> " << out_pop << ", " << in_push << " -->[ IN ]--> " << in_pop << | |
// " A = " << A << " B = " << B << " E = " << E << std::endl; | |
if(t == 19) { | |
std::cout << in_pop/(t+1) << std::endl; | |
exit(0); | |
} | |
in_pop_old = in_pop; | |
t++; | |
} | |
} | |
void DoSyncQuery(const unsigned int Value, unsigned int &Result) { | |
{ | |
std::lock_guard<std::mutex> Lock(OutputMutex); | |
OutputQueue.push_back(Value); | |
out_push++; | |
OutputCond.notify_all(); | |
} | |
{ | |
std::unique_lock<std::mutex> Lock(InputMutex); | |
while(InputQueue.empty()) InputCond.wait(Lock); | |
Result = InputQueue.front(); | |
InputQueue.pop_front(); | |
in_pop++; | |
} | |
} | |
}; | |
class TarantoolConnection: public NoSQLConnection { | |
private: | |
struct tnt_stream *Stream, *Tuple; | |
struct tnt_reply Reply; | |
public: | |
virtual void Send(const std::list<unsigned int> Value) { | |
for(auto Iter = Value.begin(); Iter != Value.end(); ++Iter) { | |
tnt_object_add_array(Tuple, 2); | |
tnt_object_add_int(Tuple, *Iter); | |
tnt_object_add_int(Tuple, *Iter); | |
tnt_replace(Stream, 512, Tuple); | |
//tnt_object_add_array(Tuple, 1); | |
//tnt_object_add_int(Tuple, *Iter); | |
//tnt_select(Stream, 512, 0, UINT32_MAX, 0, 0, Tuple); | |
tnt_object_reset(Tuple); | |
} | |
tnt_flush(Stream); | |
} | |
virtual void Receive(std::list<unsigned int> &Value) { | |
while(true) { | |
int r = Stream->read_reply(Stream, &Reply); | |
if(r == 0) Value.push_back(0); else break; | |
} | |
} | |
TarantoolConnection(const char *ConnectionString) { | |
in_push = in_pop = in_pop_old = out_push = out_pop = 0; | |
Stream = tnt_net(NULL); | |
Tuple = tnt_object(NULL); | |
tnt_reply_init(&Reply); | |
tnt_set(Stream, TNT_OPT_URI, ConnectionString); | |
if(tnt_connect(Stream) < 0) { | |
std::cerr << "Connection refused on '" | |
<< ConnectionString << "'." << std::endl; | |
} | |
SendThread = std::thread([=]{SendThreadFunc();}); | |
ReceiveThread = std::thread([=]{ReceiveThreadFunc();}); | |
MonitorThread = std::thread([=]{MonitorThreadFunc();}); | |
} | |
}; | |
class RedisConnection: public NoSQLConnection { | |
private: | |
int fd; | |
public: | |
virtual void Send(const std::list<unsigned int> Value) { | |
static char Query[16*1024*1024]; | |
char *Q = Query; | |
for(auto Iter = Value.begin(); Iter != Value.end(); ++Iter) { | |
char Number[16]; | |
sprintf(Number, "%u", *Iter); | |
unsigned int NumberLen = strlen(Number); | |
//sprintf(Q, "*3\r\n$3\r\nSET\r\n$%u\r\n%s\r\n$%u\r\n%s\r\n", NumberLen, Number, NumberLen, Number); | |
sprintf(Q, "*2\r\n$3\r\nGET\r\n$%u\r\n%s\r\n", NumberLen, Number); | |
Q = Q + strlen(Q); | |
} | |
send(fd, Query, strlen(Query), MSG_NOSIGNAL); | |
} | |
virtual void Receive(std::list<unsigned int> &Value) { | |
int Count; static char Buffer[16*1024*1024]; | |
if((Count = recv(fd, Buffer, 16*1024*1024, MSG_NOSIGNAL)) > 0) { | |
for(unsigned int i = 0; i < Count; i++) { | |
//if(Buffer[i] == '+') { | |
if(Buffer[i] == '$') { | |
Value.push_back(0); | |
} | |
} | |
} else if(Count == -1) { | |
if(errno != EAGAIN) { | |
std::cerr << strerror(errno) << std::endl; | |
exit(1); | |
} | |
} | |
} | |
RedisConnection(const char *ip, const unsigned int port, const char *auth) { | |
in_push = in_pop = in_pop_old = out_push = out_pop = 0; | |
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); | |
struct sockaddr_in sa; | |
sa.sin_family = AF_INET; | |
sa.sin_port = htons(port); | |
sa.sin_addr.s_addr = inet_addr(ip); | |
if(connect(fd, (struct sockaddr *)(&sa), sizeof(sa)) == -1) { | |
std::cerr << "Connection refused on '" | |
<< ip << ':' << port << "'." << std::endl; | |
} | |
char AuthString[1024]; | |
sprintf(AuthString, "AUTH %s\r\n", auth); | |
send(fd, AuthString, strlen(AuthString), MSG_NOSIGNAL); | |
recv(fd, AuthString, 1, MSG_NOSIGNAL); | |
int flags = fcntl(fd, F_GETFL, 0); | |
fcntl(fd, F_SETFL, flags | O_NONBLOCK); | |
SendThread = std::thread([=]{SendThreadFunc();}); | |
ReceiveThread = std::thread([=]{ReceiveThreadFunc();}); | |
MonitorThread = std::thread([=]{MonitorThreadFunc();}); | |
} | |
}; | |
uint32_t ooo = 0; | |
// Memcached protocol. | |
typedef struct memcached_header_s { | |
uint8_t magic; | |
uint8_t opcode; | |
uint16_t key_length; | |
uint8_t extras_length; | |
uint8_t data_type; | |
union { uint16_t vbucket_id; uint16_t status; }; | |
uint32_t total_body_length; | |
uint32_t opaque; | |
uint64_t cas; | |
} __attribute__((packed)) memcached_header_t; | |
typedef struct memcached_set_request_s { | |
memcached_header_t header; | |
struct { uint32_t flags; uint32_t expiration; } __attribute__((packed)) extras; | |
} __attribute__((packed)) memcached_set_request_t; | |
typedef struct memcached_get_request_s { | |
memcached_header_t header; | |
} __attribute__((packed)) memcached_get_request_t; | |
#define MEMCACHED_FILL_SET_REQUEST(r, key_size, value_size) \ | |
r->header.magic = 0x80; \ | |
r->header.opcode = 0x01; \ | |
r->header.key_length = htons(key_size); \ | |
r->header.extras_length = 0x08; \ | |
r->header.vbucket_id = 0x0000; \ | |
r->header.total_body_length = htonl(0x08 + key_size + value_size); \ | |
r->header.opaque = htonl(ooo++); \ | |
r->header.cas = 0x0000000000000000; \ | |
r->extras.flags = 0x00000000; \ | |
r->extras.expiration = 0x00000000; | |
#define MEMCACHED_FILL_GET_REQUEST(r, key_size) \ | |
r->header.magic = 0x80; \ | |
r->header.opcode = 0x00; \ | |
r->header.key_length = htons(key_size); \ | |
r->header.extras_length = 0x00; \ | |
r->header.vbucket_id = 0x0000; \ | |
r->header.total_body_length = htonl(key_size); \ | |
r->header.opaque = htonl(ooo++); \ | |
r->header.cas = 0x0000000000000000; | |
class MemcachedConnection: public NoSQLConnection { | |
private: | |
int fd; | |
public: | |
virtual void Send(const std::list<unsigned int> Value) { | |
static char Query[16*1024*1024]; | |
char *Q = Query; | |
for(auto Iter = Value.begin(); Iter != Value.end(); ++Iter) { | |
char Number[16]; | |
sprintf(Number, "%u", *Iter); | |
unsigned int NumberLen = strlen(Number); | |
/*MEMCACHED_FILL_SET_REQUEST(((memcached_set_request_t*)Q), NumberLen, NumberLen); | |
Q = Q + sizeof(memcached_set_request_t); | |
memcpy(Q, Number, NumberLen); | |
Q = Q + NumberLen; | |
memcpy(Q, Number, NumberLen); | |
Q = Q + NumberLen;*/ | |
MEMCACHED_FILL_GET_REQUEST(((memcached_get_request_t*)Q), NumberLen); | |
Q = Q + sizeof(memcached_get_request_t); | |
memcpy(Q, Number, NumberLen); | |
Q = Q + NumberLen; | |
} | |
send(fd, Query, Q - Query, MSG_NOSIGNAL); | |
A += Value.size(); | |
} | |
virtual void Receive(std::list<unsigned int> &Value) { | |
int Count; | |
static char Buffer[16*1024*1024]; | |
char *Begin = Buffer; | |
char *End = Buffer; | |
if((Count = recv(fd, End, 16*1024*1024, MSG_NOSIGNAL)) > 0) { | |
End += Count; | |
while(true) { | |
memcached_header_t *Helper = (memcached_header_t *)Begin; | |
if(End - Begin >= sizeof(memcached_header_t)) { | |
if(End - Begin >= sizeof(memcached_header_t) + ntohl(Helper->total_body_length)) { | |
Begin = Begin + sizeof(memcached_header_t) + ntohl(Helper->total_body_length); | |
Value.push_back(0); | |
B++; | |
} else break; | |
} else break; | |
} | |
memmove(Buffer, Begin, End-Begin); | |
End = Buffer + (End - Begin); | |
Begin = Buffer; | |
} | |
} | |
MemcachedConnection(const char *ip, const unsigned int port) { | |
in_push = in_pop = in_pop_old = out_push = out_pop = 0; | |
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); | |
struct sockaddr_in sa; | |
sa.sin_family = AF_INET; | |
sa.sin_port = htons(port); | |
sa.sin_addr.s_addr = inet_addr(ip); | |
if(connect(fd, (struct sockaddr *)(&sa), sizeof(sa)) == -1) { | |
std::cerr << "Connection refused on '" | |
<< ip << ':' << port << "'." << std::endl; | |
} | |
int flags = fcntl(fd, F_GETFL, 0); | |
fcntl(fd, F_SETFL, flags | O_NONBLOCK); | |
SendThread = std::thread([=]{SendThreadFunc();}); | |
ReceiveThread = std::thread([=]{ReceiveThreadFunc();}); | |
MonitorThread = std::thread([=]{MonitorThreadFunc();}); | |
} | |
}; | |
void ClientFunc(NoSQLConnection *Connection) { | |
for(unsigned int i = 0; i < 1000000; ++i) { | |
unsigned int r; | |
Connection->DoSyncQuery(i, r); | |
} | |
} | |
int main(int argc, char **argv) { | |
unsigned int N = 1024; | |
sscanf(argv[1], "%u", &N); | |
//NoSQLConnection *Connection = new TarantoolConnection("13.91.106.28:3301"); | |
NoSQLConnection *Connection = new RedisConnection("13.93.220.164", 6379, "W+KzXgUTUnAo6th95SxE6plyHt3Snb8l4CU8tw3PKco="); | |
//NoSQLConnection *Connection = new MemcachedConnection("13.93.213.230", 11211); | |
std::thread **Clients = new std::thread* [N]; | |
for(unsigned int i = 0; i < N; i++) Clients[i] = new std::thread(ClientFunc, Connection); | |
for(unsigned int i = 0; i < N; i++) Clients[i]->join(); | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment