Last active
July 8, 2021 14:29
-
-
Save locker/cd357f9482bfd207ffe7df610c4b2fba to your computer and use it in GitHub Desktop.
POC: Rewrite Tarantool net.box.call in C
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c | |
index 3f43872ca2e4..edd7ea90b4c7 100644 | |
--- a/src/box/lua/net_box.c | |
+++ b/src/box/lua/net_box.c | |
@@ -38,13 +38,16 @@ | |
#include "box/iproto_constants.h" | |
#include "box/lua/tuple.h" /* luamp_convert_tuple() / luamp_convert_key() */ | |
#include "box/xrow.h" | |
+#include "box/xrow_io.h" | |
#include "box/tuple.h" | |
#include "box/execute.h" | |
#include "lua/msgpack.h" | |
#include <base64.h> | |
+#include "assoc.h" | |
#include "coio.h" | |
+#include "fiber.h" | |
#include "box/errcode.h" | |
#include "lua/fiber.h" | |
#include "mpstream/mpstream.h" | |
@@ -884,9 +887,175 @@ netbox_decode_prepare(struct lua_State *L) | |
return 2; | |
} | |
+struct netbox_request { | |
+ struct fiber *client; | |
+ bool is_done; | |
+ char *reply; | |
+}; | |
+ | |
+struct netbox_handler { | |
+ int fd; | |
+ uint64_t next_sync; | |
+ /* sync -> netbox_request */ | |
+ struct mh_i64ptr_t *requests; | |
+ struct fiber *reader; | |
+ struct fiber *writer; | |
+ struct ibuf send_buf; | |
+ struct ibuf recv_buf; | |
+}; | |
+ | |
+static uint32_t CTID_NETBOX_HANDLER_PTR = 0; | |
+ | |
+static int | |
+netbox_writer_f(va_list va) | |
+{ | |
+ struct netbox_handler *h = va_arg(va, struct netbox_handler *); | |
+ struct ibuf *buf = &h->send_buf; | |
+ struct ev_io io; | |
+ coio_create(&io, h->fd); | |
+ while (true) { | |
+ size_t used = ibuf_used(buf); | |
+ if (used == 0) { | |
+ fiber_sleep(TIMEOUT_INFINITY); | |
+ continue; | |
+ } | |
+ ssize_t rc = coio_write_timeout_noxc(&io, buf->rpos, used, | |
+ TIMEOUT_INFINITY); | |
+ if (rc < 0) | |
+ break; | |
+ buf->rpos += rc; | |
+ } | |
+ return 0; | |
+} | |
+ | |
+static int | |
+netbox_reader_f(va_list va) | |
+{ | |
+ struct netbox_handler *h = va_arg(va, struct netbox_handler *); | |
+ struct ibuf *buf = &h->recv_buf; | |
+ struct mh_i64ptr_t *requests = h->requests; | |
+ struct ev_io io; | |
+ coio_create(&io, h->fd); | |
+ while (true) { | |
+ struct xrow_header row; | |
+ if (coio_read_xrow_noxc(&io, buf, &row) != 0) | |
+ break; | |
+ mh_int_t k = mh_i64ptr_find(requests, row.sync, NULL); | |
+ if (k == mh_end(requests)) | |
+ continue; | |
+ struct netbox_request *req; | |
+ req = mh_i64ptr_node(requests, k)->val; | |
+ mh_i64ptr_del(requests, k, NULL); | |
+ if (row.bodycnt == 1) { | |
+ const char *data = row.body[0].iov_base; | |
+ const char *data_end = data + row.body[0].iov_len; | |
+ mp_decode_map(&data); | |
+ mp_decode_uint(&data); | |
+ req->reply = malloc(data_end - data); | |
+ memcpy(req->reply, data, data_end - data); | |
+ } else { | |
+ req->reply = NULL; | |
+ } | |
+ req->is_done = true; | |
+ fiber_wakeup(req->client); | |
+ } | |
+ return 0; | |
+} | |
+ | |
+static int | |
+netbox_start_handler(struct lua_State *L) | |
+{ | |
+ struct netbox_handler *h = malloc(sizeof(*h)); | |
+ h->fd = lua_tointeger(L, 1); | |
+ h->next_sync = 0; | |
+ h->requests = mh_i64ptr_new(); | |
+ h->reader = fiber_new("netbox.reader", netbox_reader_f); | |
+ h->writer = fiber_new("netbox.writer", netbox_writer_f); | |
+ ibuf_create(&h->send_buf, &cord()->slabc, 16 * 1024); | |
+ ibuf_create(&h->recv_buf, &cord()->slabc, 16 * 1024); | |
+ fiber_start(h->reader, h); | |
+ fiber_start(h->writer, h); | |
+ *(struct netbox_handler **) | |
+ luaL_pushcdata(L, CTID_NETBOX_HANDLER_PTR) = h; | |
+ return 1; | |
+} | |
+ | |
+static int | |
+netbox_execute_call(struct lua_State *L) | |
+{ | |
+ uint32_t ctypeid; | |
+ struct netbox_handler *h = *(struct netbox_handler **) | |
+ luaL_checkcdata(L, 1, &ctypeid); | |
+ | |
+ uint64_t sync = h->next_sync++; | |
+ struct ibuf *buf = &h->send_buf; | |
+ size_t used_before = ibuf_used(buf); | |
+ | |
+ struct mpstream stream; | |
+ mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb, | |
+ luamp_error, L); | |
+ | |
+ /* reserve space for fixheader */ | |
+ const size_t fixheader_size = mp_sizeof_uint(UINT32_MAX); | |
+ mpstream_reserve(&stream, fixheader_size); | |
+ mpstream_advance(&stream, fixheader_size); | |
+ | |
+ /* encode header */ | |
+ mpstream_encode_map(&stream, 2); | |
+ mpstream_encode_uint(&stream, IPROTO_SYNC); | |
+ mpstream_encode_uint(&stream, sync); | |
+ mpstream_encode_uint(&stream, IPROTO_REQUEST_TYPE); | |
+ mpstream_encode_uint(&stream, IPROTO_CALL); | |
+ | |
+ /* encode data */ | |
+ mpstream_encode_map(&stream, 2); | |
+ /* encode proc name */ | |
+ size_t name_len; | |
+ const char *name = lua_tolstring(L, 2, &name_len); | |
+ mpstream_encode_uint(&stream, IPROTO_FUNCTION_NAME); | |
+ mpstream_encode_strn(&stream, name, name_len); | |
+ /* encode args */ | |
+ mpstream_encode_uint(&stream, IPROTO_TUPLE); | |
+ luamp_encode_tuple(L, cfg, &stream, 3); | |
+ | |
+ mpstream_flush(&stream); | |
+ size_t used_after = ibuf_used(buf); | |
+ size_t size = used_after - used_before; | |
+ char *data = buf->wpos - size; | |
+ | |
+ /* store fixheader */ | |
+ char *fixheader = data; | |
+ *(fixheader++) = 0xce; | |
+ mp_store_u32(fixheader, size - fixheader_size); | |
+ | |
+ struct netbox_request req = { | |
+ .client = fiber(), | |
+ .is_done = false, | |
+ }; | |
+ struct mh_i64ptr_node_t n = { sync, &req }; | |
+ mh_i64ptr_put(h->requests, &n, NULL, NULL); | |
+ fiber_wakeup(h->writer); | |
+ | |
+ while (!req.is_done) | |
+ fiber_sleep(TIMEOUT_INFINITY); | |
+ | |
+ if (req.reply != NULL) { | |
+ const char *data = req.reply; | |
+ int count = mp_decode_array(&data); | |
+ for (int i = 0; i < count; i++) | |
+ luamp_decode(L, luaL_msgpack_default, &data); | |
+ free(req.reply); | |
+ return count; | |
+ } else | |
+ return 0; | |
+} | |
+ | |
int | |
luaopen_net_box(struct lua_State *L) | |
{ | |
+ luaL_cdef(L, "struct netbox_handler;"); | |
+ CTID_NETBOX_HANDLER_PTR = luaL_ctypeid(L, "struct netbox_handler *"); | |
+ | |
static const luaL_Reg net_box_lib[] = { | |
{ "encode_ping", netbox_encode_ping }, | |
{ "encode_call_16", netbox_encode_call_16 }, | |
@@ -906,6 +1075,8 @@ luaopen_net_box(struct lua_State *L) | |
{ "decode_select", netbox_decode_select }, | |
{ "decode_execute", netbox_decode_execute }, | |
{ "decode_prepare", netbox_decode_prepare }, | |
+ { "execute_call", netbox_execute_call }, | |
+ { "start_handler", netbox_start_handler }, | |
{ NULL, NULL} | |
}; | |
/* luaL_register_module polutes _G */ | |
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua | |
index 3878abf21914..534e8ba31e7b 100644 | |
--- a/src/box/lua/net_box.lua | |
+++ b/src/box/lua/net_box.lua | |
@@ -258,6 +258,7 @@ local function on_push_sync_default() end | |
-- | |
local function create_transport(host, port, user, password, callback, | |
connection, greeting) | |
+ local transport = {} | |
-- check / normalize credentials | |
if user == nil and password ~= nil then | |
box.error(E_PROC_LUA, 'net.box: user is not defined') | |
@@ -500,6 +501,7 @@ local function create_transport(host, port, user, password, callback, | |
connection, greeting = | |
establish_connection(host, port, callback('fetch_connect_timeout')) | |
if connection then | |
+ transport._handler = internal.start_handler(connection:fd()) | |
goto handle_connection | |
end | |
timeout = callback('reconnect_timeout') | |
@@ -869,6 +871,9 @@ local function create_transport(host, port, user, password, callback, | |
end | |
iproto_sm = function(schema_version) | |
+ while true do | |
+ fiber.sleep(TIMEOUT_INFINITY) | |
+ end | |
local err, hdr, body_rpos, body_end = send_and_recv_iproto() | |
if err then return error_sm(err, hdr) end | |
dispatch_response_iproto(hdr, body_rpos, body_end) | |
@@ -896,13 +901,12 @@ local function create_transport(host, port, user, password, callback, | |
end | |
end | |
- return { | |
- stop = stop, | |
- start = start, | |
- wait_state = wait_state, | |
- perform_request = perform_request, | |
- perform_async_request = perform_async_request, | |
- } | |
+ transport.stop = stop | |
+ transport.start = start | |
+ transport.wait_state = wait_state | |
+ transport.perform_request = perform_request | |
+ transport.perform_async_request = perform_async_request | |
+ return transport | |
end | |
-- Wrap create_transport, adding auto-stop-on-GC feature. | |
@@ -1212,6 +1216,9 @@ function remote_methods:_request(method, opts, request_ctx, ...) | |
end | |
function remote_methods:ping(opts) | |
+ if true then | |
+ return true | |
+ end | |
check_remote_arg(self, 'ping') | |
return (pcall(self._request, self, 'ping', opts)) | |
end | |
@@ -1228,6 +1235,9 @@ function remote_methods:call_16(func_name, ...) | |
end | |
function remote_methods:call(func_name, args, opts) | |
+ if true then | |
+ return internal.execute_call(self._transport._handler, func_name, args) | |
+ end | |
check_remote_arg(self, 'call') | |
check_call_args(args) | |
args = args or {} | |
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc | |
index 48707982bd21..68de96440d2c 100644 | |
--- a/src/box/xrow_io.cc | |
+++ b/src/box/xrow_io.cc | |
@@ -62,6 +62,18 @@ coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row) | |
true); | |
} | |
+int | |
+coio_read_xrow_noxc(struct ev_io *coio, struct ibuf *in, | |
+ struct xrow_header *row) | |
+{ | |
+ try { | |
+ coio_read_xrow(coio, in, row); | |
+ return 0; | |
+ } catch (Exception *) { | |
+ return -1; | |
+ } | |
+} | |
+ | |
void | |
coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, | |
struct xrow_header *row, ev_tstamp timeout) | |
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h | |
index 0eb7a8ace11b..37af7760d09c 100644 | |
--- a/src/box/xrow_io.h | |
+++ b/src/box/xrow_io.h | |
@@ -41,6 +41,10 @@ struct xrow_header; | |
void | |
coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row); | |
+int | |
+coio_read_xrow_noxc(struct ev_io *coio, struct ibuf *in, | |
+ struct xrow_header *row); | |
+ | |
void | |
coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, | |
struct xrow_header *row, double timeout); | |
diff --git a/src/version.c b/src/version.c | |
index 7d2d038de1cf..1ef13277aa14 100644 | |
--- a/src/version.c | |
+++ b/src/version.c | |
@@ -40,7 +40,8 @@ tarantool_package(void) | |
const char * | |
tarantool_version(void) | |
{ | |
- return PACKAGE_VERSION; | |
+ /* to prevent vshard from using net.box futures */ | |
+ return "1.9.0-121-g15cb0bd537b4"; | |
} | |
uint32_t |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment