Skip to content

Instantly share code, notes, and snippets.

@danikin
Last active January 30, 2019 03:24
Show Gist options
  • Save danikin/8bfa33e6dff93ccf211c to your computer and use it in GitHub Desktop.
Save danikin/8bfa33e6dff93ccf211c to your computer and use it in GitHub Desktop.
Asynchronous client for Tarantool with synchronous interface
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <sys/time.h>
#include <tarantool/tarantool.h>
#include <tarantool/tnt_net.h>
#include <tarantool/tnt_opt.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <cassert>
// An object of this class can be used to connect to tarantool
// and then to issue requests on one connection from different threads.
//
// The key thing about this class is that one object is good to serve all the workload
// from a single machiune because Tarantool allows using one socket in parallel for different
// requests
// Plus this workload will be served extremely efficient because under the hoop inside the
// standard Tarantool c-library all the parallel requests will be packed into a single packet and
// all the parallel responses will come in in a single packet which allows to recude the number
// of system calls at order or magitude or even more
class TarantoolConnection
{
public:
// conn_string - "IP:port"
TarantoolConnection(const char *conn_string);
bool IsConnected() { return connected_; }
// Requests the server for the spercified query then receives a response
int DoQuery(struct tnt_stream *tuple, struct tnt_reply **result);
private:
void SendingThread()
{
int j = (int)time_t(NULL);
struct tnt_stream *tuple = tnt_object(NULL);
// Constantly sending everything from the out queue
while (true)
{
std::deque<tnt_stream*> temp;
{
std::unique_lock<std::mutex> l(out_mutex_);
// Waiting while out_queue is empty
// out_cond should be fired once a queue is not empty
while (out_queue_.empty())
out_cond_.wait(l);
// Copy all the queue to the temp one
// We're doing that under the mutex
// In fact this is extremely fast as it is the swap operation
temp.swap(out_queue_);
}
assert(!temp.empty());
// printf("sending %d requests\n", temp.size());
// Now we don't need a mutex. Just send everything over the network
for (auto i = temp.begin(); i != temp.end(); ++i)
{
tnt_object_add_array(tuple, 2);
tnt_object_add_int(tuple, j);
tnt_object_add_int(tuple, j);
tnt_replace(tnt_, 512, tuple);
++j;
// tnt_flush(tnt_);
tnt_object_reset(tuple);
}
tnt_flush(tnt_);
// tnt_object_reset(tuple);
// Notify the receiveing thread that it can start receive data
// It we did not than it would in an active waiting as read_reply would return 1
send_notify_cond_.notify_all();
} // while
}
void ReceivingThread()
{
struct tnt_reply reply;
tnt_reply_init(&reply);
// Constantly receiving everything and putting it to the in queue
while (true)
{
// Receive everything that we can receive
std::deque<struct tnt_reply*> temp;
/*while (true)
{
struct tnt_reply *reply = tnt_reply_init(NULL);
int r = tnt_->read_reply(tnt_, reply);
if (!r)
break;
temp.push_back(reply);
}*/
// Receive as much as we can but no more than 10000
int r = 0;
int counter;
for (counter = 0; counter < 10000; ++counter)
{
r = tnt_->read_reply(tnt_, &reply);
if (r == 0)
temp.push_back(NULL);
else
break;
}
// printf("counter=%d\n", counter);
// We got the result
if (!temp.empty())
{
// Put it to the in queue under the mutex
std::lock_guard<std::mutex> l(in_mutex_);
in_queue_.insert(in_queue_.end(), temp.begin(), temp.end());
// We're notifying all the threads, but it would be better to notify only those of them
// that waits for the data in just received "temp"
in_cond_.notify_all();
}
// We ain't got any result
else
{
if (r == -1)
fprintf(stderr, "Error receiveing response: r=%d, '%s'\n", r, reply.error);
else
// No result - just wait util it comes in
if (r == 1)
{
// Waiting for a send thread to send data to network
// We have to wait because right now we don't have any data to read from network
std::unique_lock<std::mutex> l(send_notify_mutex_);
send_notify_cond_.wait(l);
}
}
}
}
void TimerThread()
{
int64_t prev_answers = 0, rps;
while (true)
{
sleep(1);
{
std::lock_guard<std::mutex> l(in_mutex_);
rps = num_answers_ - prev_answers;
prev_answers = num_answers_;
}
printf("RPS=%d\n", rps);
fflush(stdout);
}
}
std::deque<struct tnt_reply*> in_queue_;
std::deque<tnt_stream*> out_queue_;
std::mutex in_mutex_, out_mutex_, send_notify_mutex_;
std::condition_variable in_cond_, out_cond_, send_notify_cond_;
std::thread send_thread_, receive_thread_, timer_thread_;
struct tnt_stream *tnt_;
bool connected_;
int64_t num_answers_;
};
TarantoolConnection::TarantoolConnection(const char *conn_string)
{
num_answers_ = 0;
connected_ = false;
tnt_ = tnt_net(NULL);
tnt_set(tnt_, TNT_OPT_URI, conn_string);
if (tnt_connect(tnt_) < 0)
{
fprintf(stderr, "Connection refused on '%s'\n", conn_string);
fflush(stderr);
return;
}
connected_ = true;
// Starting threads
send_thread_ = std::thread( [=] { SendingThread(); });
receive_thread_ = std::thread( [=] { ReceivingThread(); });
timer_thread_ = std::thread( [=] { TimerThread(); });
}
int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **result)
{
// Put the query into outgoing queue
// Protect this queue from multithreading access
{
std::lock_guard<std::mutex> l(out_mutex_);
out_queue_.push_back(tuple);
// Notify the sending thread that it can send data
out_cond_.notify_all();
}
// Now this query should be processed in a background sending thread
// There is something in queue - get this answer to the client
std::unique_lock<std::mutex> l(in_mutex_);
{
// Wait until we have data in the in_queue
while (in_queue_.empty())
in_cond_.wait(l);
assert(!in_queue_.empty());
*result = in_queue_.front();
in_queue_.pop_front();
++num_answers_;
}
// if (!(num_answers_%100000))
// printf("num_answers_=%d\n", num_answers_);
return 1;
}
void DoTest(TarantoolConnection *conn)
{
struct tnt_reply *result;
for (int i = 0; i < 100000000; ++i)
conn->DoQuery(NULL, &result);
}
int main()
{
TarantoolConnection conn("172.31.26.200:3301");
if (!conn.IsConnected())
return 0;
std::thread t[1000];
for (int i = 0;i < 1000;++i)
t[i] = std::thread(&DoTest, &conn);
sleep(1000000);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment