Skip to content

Instantly share code, notes, and snippets.

@grifx
Last active July 11, 2023 04:29
Show Gist options
  • Save grifx/68c2fb9aaa071daa4bf959ebfe7842ec to your computer and use it in GitHub Desktop.
Save grifx/68c2fb9aaa071daa4bf959ebfe7842ec to your computer and use it in GitHub Desktop.
static int
ngx_http_lua_socket_tcp_setkeepalive_nats(lua_State *L)
{
ngx_http_lua_loc_conf_t *llcf;
ngx_http_lua_socket_tcp_upstream_t *u;
ngx_connection_t *c;
ngx_http_lua_socket_pool_t *spool;
ngx_str_t key;
ngx_queue_t *q;
ngx_peer_connection_t *pc;
ngx_http_request_t *r;
ngx_msec_t timeout;
ngx_int_t pool_size;
int n;
ngx_int_t rc;
ngx_buf_t *b;
const char *msg;
ngx_http_lua_socket_pool_item_t *item;
n = lua_gettop(L);
if (n < 1 || n > 3) {
return luaL_error(L, "expecting 1 to 3 arguments "
"(including the object), but got %d", n);
}
luaL_checktype(L, 1, LUA_TTABLE);
lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
u = lua_touserdata(L, -1);
lua_pop(L, 1);
if (u == NULL) {
lua_pushnil(L);
lua_pushliteral(L, "closed");
return 2;
}
/* stack: obj timeout? size? */
pc = &u->peer;
c = pc->connection;
if (c == NULL || u->read_closed || u->write_closed) {
lua_pushnil(L);
lua_pushliteral(L, "closed");
return 2;
}
r = ngx_http_lua_get_req(L);
if (r == NULL) {
return luaL_error(L, "no request found");
}
if (u->request != r) {
return luaL_error(L, "bad request");
}
ngx_http_lua_socket_check_busy_connecting(r, u, L);
ngx_http_lua_socket_check_busy_reading(r, u, L);
ngx_http_lua_socket_check_busy_writing(r, u, L);
b = &u->buffer;
if (b->start && ngx_buf_size(b)) {
ngx_http_lua_probe_socket_tcp_setkeepalive_buf_unread(r, u, b->pos,
b->last - b->pos);
lua_pushnil(L);
lua_pushliteral(L, "unread data in buffer");
return 2;
}
if (c->read->eof
|| c->read->error
|| c->read->timedout
|| c->write->error
|| c->write->timedout)
{
lua_pushnil(L);
lua_pushliteral(L, "invalid connection");
return 2;
}
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
lua_pushnil(L);
lua_pushliteral(L, "failed to handle read event");
return 2;
}
if (ngx_terminate || ngx_exiting) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"lua tcp socket set keepalive while process exiting, "
"closing connection %p", c);
ngx_http_lua_socket_tcp_finalize(r, u);
lua_pushinteger(L, 1);
return 1;
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"lua tcp socket set keepalive: saving connection %p", c);
lua_pushlightuserdata(L, ngx_http_lua_lightudata_mask(socket_pool_key));
lua_rawget(L, LUA_REGISTRYINDEX);
/* stack: obj timeout? size? pools */
lua_rawgeti(L, 1, SOCKET_KEY_INDEX);
key.data = (u_char *) lua_tolstring(L, -1, &key.len);
if (key.data == NULL) {
lua_pushnil(L);
lua_pushliteral(L, "key not found");
return 2;
}
dd("saving connection to key %s", lua_tostring(L, -1));
lua_pushvalue(L, -1);
lua_rawget(L, -3);
spool = lua_touserdata(L, -1);
lua_pop(L, 1);
/* stack: obj timeout? size? pools cache_key */
llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module);
if (spool == NULL) {
/* create a new socket pool for the current peer key */
if (n >= 3 && !lua_isnil(L, 3)) {
pool_size = luaL_checkinteger(L, 3);
} else {
pool_size = llcf->pool_size;
}
if (pool_size <= 0) {
msg = lua_pushfstring(L, "bad \"pool_size\" option value: %i",
pool_size);
return luaL_argerror(L, n, msg);
}
ngx_http_lua_socket_tcp_create_socket_pool(L, r, key, pool_size, -1,
&spool);
}
if (ngx_queue_empty(&spool->free)) {
q = ngx_queue_last(&spool->cache);
ngx_queue_remove(q);
item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue);
ngx_http_lua_socket_tcp_close_connection(item->connection);
/* only decrease the counter for connections which were counted */
if (u->socket_pool != NULL) {
u->socket_pool->connections--;
}
} else {
q = ngx_queue_head(&spool->free);
ngx_queue_remove(q);
item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue);
/* we should always increase connections after getting connected,
* and decrease connections after getting closed.
* however, we don't create connection pool in previous connect method.
* so we increase connections here for backward compatibility.
*/
if (u->socket_pool == NULL) {
spool->connections++;
}
}
item->connection = c;
ngx_queue_insert_head(&spool->cache, q);
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"lua tcp socket clear current socket connection");
pc->connection = NULL;
#if 0
if (u->cleanup) {
*u->cleanup = NULL;
u->cleanup = NULL;
}
#endif
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
if (n >= 2 && !lua_isnil(L, 2)) {
timeout = (ngx_msec_t) luaL_checkinteger(L, 2);
} else {
timeout = llcf->keepalive_timeout;
}
#if (NGX_DEBUG)
if (timeout == 0) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua tcp socket keepalive timeout: unlimited");
}
#endif
if (timeout) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua tcp socket keepalive timeout: %M ms", timeout);
ngx_add_timer(c->read, timeout);
}
c->write->handler = ngx_http_lua_socket_keepalive_dummy_handler;
c->read->handler = ngx_http_lua_socket_keepalive_rev_handler_nats;
c->data = item;
c->idle = 1;
c->log = ngx_cycle->log;
c->pool->log = ngx_cycle->log;
c->read->log = ngx_cycle->log;
c->write->log = ngx_cycle->log;
item->socklen = pc->socklen;
ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
item->reused = u->reused;
if (c->read->ready) {
rc = ngx_http_lua_socket_keepalive_close_handler(c->read);
if (rc != NGX_OK) {
lua_pushnil(L);
lua_pushliteral(L, "connection in dubious state");
return 2;
}
}
#if 1
ngx_http_lua_socket_tcp_finalize(r, u);
#endif
/* since we set u->peer->connection to NULL previously, the connect
* operation won't be resumed in the ngx_http_lua_socket_tcp_finalize.
* Therefore we need to resume it here.
*/
ngx_http_lua_socket_tcp_resume_conn_op(spool);
lua_pushinteger(L, 1);
return 1;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment