Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
POC: Rewrite Tarantool net.box.call in C
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 3f43872ca2e4..edd7ea90b4c7 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -38,13 +38,16 @@
#include "box/iproto_constants.h"
#include "box/lua/tuple.h" /* luamp_convert_tuple() / luamp_convert_key() */
#include "box/xrow.h"
+#include "box/xrow_io.h"
#include "box/tuple.h"
#include "box/execute.h"
#include "lua/msgpack.h"
#include <base64.h>
+#include "assoc.h"
#include "coio.h"
+#include "fiber.h"
#include "box/errcode.h"
#include "lua/fiber.h"
#include "mpstream/mpstream.h"
@@ -884,9 +887,175 @@ netbox_decode_prepare(struct lua_State *L)
return 2;
}
+struct netbox_request {
+ struct fiber *client;
+ bool is_done;
+ char *reply;
+};
+
+struct netbox_handler {
+ int fd;
+ uint64_t next_sync;
+ /* sync -> netbox_request */
+ struct mh_i64ptr_t *requests;
+ struct fiber *reader;
+ struct fiber *writer;
+ struct ibuf send_buf;
+ struct ibuf recv_buf;
+};
+
+static uint32_t CTID_NETBOX_HANDLER_PTR = 0;
+
+static int
+netbox_writer_f(va_list va)
+{
+ struct netbox_handler *h = va_arg(va, struct netbox_handler *);
+ struct ibuf *buf = &h->send_buf;
+ struct ev_io io;
+ coio_create(&io, h->fd);
+ while (true) {
+ size_t used = ibuf_used(buf);
+ if (used == 0) {
+ fiber_sleep(TIMEOUT_INFINITY);
+ continue;
+ }
+ ssize_t rc = coio_write_timeout_noxc(&io, buf->rpos, used,
+ TIMEOUT_INFINITY);
+ if (rc < 0)
+ break;
+ buf->rpos += rc;
+ }
+ return 0;
+}
+
+static int
+netbox_reader_f(va_list va)
+{
+ struct netbox_handler *h = va_arg(va, struct netbox_handler *);
+ struct ibuf *buf = &h->recv_buf;
+ struct mh_i64ptr_t *requests = h->requests;
+ struct ev_io io;
+ coio_create(&io, h->fd);
+ while (true) {
+ struct xrow_header row;
+ if (coio_read_xrow_noxc(&io, buf, &row) != 0)
+ break;
+ mh_int_t k = mh_i64ptr_find(requests, row.sync, NULL);
+ if (k == mh_end(requests))
+ continue;
+ struct netbox_request *req;
+ req = mh_i64ptr_node(requests, k)->val;
+ mh_i64ptr_del(requests, k, NULL);
+ if (row.bodycnt == 1) {
+ const char *data = row.body[0].iov_base;
+ const char *data_end = data + row.body[0].iov_len;
+ mp_decode_map(&data);
+ mp_decode_uint(&data);
+ req->reply = malloc(data_end - data);
+ memcpy(req->reply, data, data_end - data);
+ } else {
+ req->reply = NULL;
+ }
+ req->is_done = true;
+ fiber_wakeup(req->client);
+ }
+ return 0;
+}
+
+static int
+netbox_start_handler(struct lua_State *L)
+{
+ struct netbox_handler *h = malloc(sizeof(*h));
+ h->fd = lua_tointeger(L, 1);
+ h->next_sync = 0;
+ h->requests = mh_i64ptr_new();
+ h->reader = fiber_new("netbox.reader", netbox_reader_f);
+ h->writer = fiber_new("netbox.writer", netbox_writer_f);
+ ibuf_create(&h->send_buf, &cord()->slabc, 16 * 1024);
+ ibuf_create(&h->recv_buf, &cord()->slabc, 16 * 1024);
+ fiber_start(h->reader, h);
+ fiber_start(h->writer, h);
+ *(struct netbox_handler **)
+ luaL_pushcdata(L, CTID_NETBOX_HANDLER_PTR) = h;
+ return 1;
+}
+
+static int
+netbox_execute_call(struct lua_State *L)
+{
+ uint32_t ctypeid;
+ struct netbox_handler *h = *(struct netbox_handler **)
+ luaL_checkcdata(L, 1, &ctypeid);
+
+ uint64_t sync = h->next_sync++;
+ struct ibuf *buf = &h->send_buf;
+ size_t used_before = ibuf_used(buf);
+
+ struct mpstream stream;
+ mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb,
+ luamp_error, L);
+
+ /* reserve space for fixheader */
+ const size_t fixheader_size = mp_sizeof_uint(UINT32_MAX);
+ mpstream_reserve(&stream, fixheader_size);
+ mpstream_advance(&stream, fixheader_size);
+
+ /* encode header */
+ mpstream_encode_map(&stream, 2);
+ mpstream_encode_uint(&stream, IPROTO_SYNC);
+ mpstream_encode_uint(&stream, sync);
+ mpstream_encode_uint(&stream, IPROTO_REQUEST_TYPE);
+ mpstream_encode_uint(&stream, IPROTO_CALL);
+
+ /* encode data */
+ mpstream_encode_map(&stream, 2);
+ /* encode proc name */
+ size_t name_len;
+ const char *name = lua_tolstring(L, 2, &name_len);
+ mpstream_encode_uint(&stream, IPROTO_FUNCTION_NAME);
+ mpstream_encode_strn(&stream, name, name_len);
+ /* encode args */
+ mpstream_encode_uint(&stream, IPROTO_TUPLE);
+ luamp_encode_tuple(L, cfg, &stream, 3);
+
+ mpstream_flush(&stream);
+ size_t used_after = ibuf_used(buf);
+ size_t size = used_after - used_before;
+ char *data = buf->wpos - size;
+
+ /* store fixheader */
+ char *fixheader = data;
+ *(fixheader++) = 0xce;
+ mp_store_u32(fixheader, size - fixheader_size);
+
+ struct netbox_request req = {
+ .client = fiber(),
+ .is_done = false,
+ };
+ struct mh_i64ptr_node_t n = { sync, &req };
+ mh_i64ptr_put(h->requests, &n, NULL, NULL);
+ fiber_wakeup(h->writer);
+
+ while (!req.is_done)
+ fiber_sleep(TIMEOUT_INFINITY);
+
+ if (req.reply != NULL) {
+ const char *data = req.reply;
+ int count = mp_decode_array(&data);
+ for (int i = 0; i < count; i++)
+ luamp_decode(L, luaL_msgpack_default, &data);
+ free(req.reply);
+ return count;
+ } else
+ return 0;
+}
+
int
luaopen_net_box(struct lua_State *L)
{
+ luaL_cdef(L, "struct netbox_handler;");
+ CTID_NETBOX_HANDLER_PTR = luaL_ctypeid(L, "struct netbox_handler *");
+
static const luaL_Reg net_box_lib[] = {
{ "encode_ping", netbox_encode_ping },
{ "encode_call_16", netbox_encode_call_16 },
@@ -906,6 +1075,8 @@ luaopen_net_box(struct lua_State *L)
{ "decode_select", netbox_decode_select },
{ "decode_execute", netbox_decode_execute },
{ "decode_prepare", netbox_decode_prepare },
+ { "execute_call", netbox_execute_call },
+ { "start_handler", netbox_start_handler },
{ NULL, NULL}
};
/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 3878abf21914..534e8ba31e7b 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -258,6 +258,7 @@ local function on_push_sync_default() end
--
local function create_transport(host, port, user, password, callback,
connection, greeting)
+ local transport = {}
-- check / normalize credentials
if user == nil and password ~= nil then
box.error(E_PROC_LUA, 'net.box: user is not defined')
@@ -500,6 +501,7 @@ local function create_transport(host, port, user, password, callback,
connection, greeting =
establish_connection(host, port, callback('fetch_connect_timeout'))
if connection then
+ transport._handler = internal.start_handler(connection:fd())
goto handle_connection
end
timeout = callback('reconnect_timeout')
@@ -869,6 +871,9 @@ local function create_transport(host, port, user, password, callback,
end
iproto_sm = function(schema_version)
+ while true do
+ fiber.sleep(TIMEOUT_INFINITY)
+ end
local err, hdr, body_rpos, body_end = send_and_recv_iproto()
if err then return error_sm(err, hdr) end
dispatch_response_iproto(hdr, body_rpos, body_end)
@@ -896,13 +901,12 @@ local function create_transport(host, port, user, password, callback,
end
end
- return {
- stop = stop,
- start = start,
- wait_state = wait_state,
- perform_request = perform_request,
- perform_async_request = perform_async_request,
- }
+ transport.stop = stop
+ transport.start = start
+ transport.wait_state = wait_state
+ transport.perform_request = perform_request
+ transport.perform_async_request = perform_async_request
+ return transport
end
-- Wrap create_transport, adding auto-stop-on-GC feature.
@@ -1212,6 +1216,9 @@ function remote_methods:_request(method, opts, request_ctx, ...)
end
function remote_methods:ping(opts)
+ if true then
+ return true
+ end
check_remote_arg(self, 'ping')
return (pcall(self._request, self, 'ping', opts))
end
@@ -1228,6 +1235,9 @@ function remote_methods:call_16(func_name, ...)
end
function remote_methods:call(func_name, args, opts)
+ if true then
+ return internal.execute_call(self._transport._handler, func_name, args)
+ end
check_remote_arg(self, 'call')
check_call_args(args)
args = args or {}
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 48707982bd21..68de96440d2c 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -62,6 +62,18 @@ coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row)
true);
}
+int
+coio_read_xrow_noxc(struct ev_io *coio, struct ibuf *in,
+ struct xrow_header *row)
+{
+ try {
+ coio_read_xrow(coio, in, row);
+ return 0;
+ } catch (Exception *) {
+ return -1;
+ }
+}
+
void
coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
struct xrow_header *row, ev_tstamp timeout)
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 0eb7a8ace11b..37af7760d09c 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -41,6 +41,10 @@ struct xrow_header;
void
coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row);
+int
+coio_read_xrow_noxc(struct ev_io *coio, struct ibuf *in,
+ struct xrow_header *row);
+
void
coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
struct xrow_header *row, double timeout);
diff --git a/src/version.c b/src/version.c
index 7d2d038de1cf..1ef13277aa14 100644
--- a/src/version.c
+++ b/src/version.c
@@ -40,7 +40,8 @@ tarantool_package(void)
const char *
tarantool_version(void)
{
- return PACKAGE_VERSION;
+ /* to prevent vshard from using net.box futures */
+ return "1.9.0-121-g15cb0bd537b4";
}
uint32_t
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment