Last active
August 29, 2015 14:12
-
-
Save daurnimator/55acef0626e00cf66c16 to your computer and use it in GitHub Desktop.
ngx lua function to allow waiting for a file descriptor to be ready
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "ngx_http_lua_cqueues.h" | |
#include <math.h> /* HUGE_VAL */ | |
#include <stdlib.h> /* calloc, free */ | |
#include <lua.h> | |
#include <lauxlib.h> | |
#include "ngx_core.h" | |
#include "ngx_http.h" /* ngx_http_request_t, ngx_http_run_posted_requests */ | |
#include "ngx_http_lua_common.h" /* ngx_http_lua_module, ngx_http_lua_co_ctx_t */ | |
/* need to include DDEBUG so that ngx_http_lua_util.h works */ | |
#ifndef DDEBUG | |
#define DDEBUG 0 | |
#endif | |
#include "ddebug.h" | |
#include "ngx_http_lua_util.h" /* ngx_http_lua_get_lua_vm, ngx_http_lua_run_thread, ngx_http_lua_finalize_request */ | |
#include "ngx_http_lua_contentby.h" /* ngx_http_lua_content_wev_handler */ | |
typedef struct { | |
ngx_http_request_t *request; | |
ngx_http_lua_co_ctx_t *co_ctx; | |
ngx_connection_t *conn; | |
} ngx_http_lua_udata_t; | |
static ngx_int_t ngx_http_lua_cqueues_resume_request(ngx_http_request_t *r) { | |
ngx_http_lua_ctx_t *ctx; | |
lua_State *vm; | |
ngx_int_t rc; | |
if ((ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module)) == NULL) { | |
return NGX_ERROR; | |
} | |
/* restore normal resume handler */ | |
ctx->resume_handler = ngx_http_lua_wev_handler; | |
/* resume lua thread */ | |
vm = ngx_http_lua_get_lua_vm(r, ctx); | |
rc = ngx_http_lua_run_thread(vm, r, ctx, 0 /*nret*/); | |
switch (rc) { | |
case NGX_DONE: /* coroutine finished */ | |
ngx_http_lua_finalize_request(r, NGX_DONE); | |
/* fall-through */ | |
case NGX_AGAIN: /* coroutine yielded */ | |
return ngx_http_lua_run_posted_threads(r->connection, vm, r, ctx); | |
default: /* NGX_ERROR: coroutine failed */ | |
if (ctx->entered_content_phase) { | |
ngx_http_lua_finalize_request(r, rc); | |
return NGX_DONE; | |
} | |
return rc; | |
} | |
} | |
static void ngx_http_lua_cqueues_rev_handler(ngx_event_t *ev) { | |
ngx_connection_t *conn = ev->data; | |
ngx_http_lua_udata_t *u = conn->data; | |
ngx_http_request_t *r = u->request; | |
ngx_http_lua_co_ctx_t *co_ctx = u->co_ctx; | |
ngx_http_lua_ctx_t *ctx; | |
ngx_http_lua_cleanup_pending_operation(co_ctx); | |
ev = NULL, conn = NULL, u = NULL; /* now invalidated */ | |
if ((ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module)) != NULL) { | |
/* set current coroutine to the one that had the event */ | |
ctx->cur_co_ctx = co_ctx; | |
ctx->resume_handler = ngx_http_lua_cqueues_resume_request; | |
/* queue/fire off handler */ | |
r->write_event_handler(r); | |
} | |
ngx_http_run_posted_requests(r->connection); | |
} | |
static void ngx_http_lua_cqueues_cleanup(ngx_http_lua_co_ctx_t *co_ctx) { | |
ngx_http_lua_udata_t *u = co_ctx->data; | |
if (u->conn != NULL) { | |
/* can't use ngx_close_connection here, | |
as it closes the file descriptor unconditionally */ | |
/* cancel timeout timer */ | |
if (u->conn->read->timer_set) { | |
ngx_del_timer(u->conn->read); | |
} | |
if (u->conn->fd != -1) { | |
/* remove from mainloop; do not pass CLOSE_SOCKET flag */ | |
ngx_del_conn(u->conn, 0); | |
} | |
/* delete any pending but not handled events */ | |
#if (NGX_THREADS) | |
ngx_mutex_lock(ngx_posted_events_mutex); | |
#endif | |
#if defined(nginx_version) && nginx_version >= 1007005 | |
if (u->conn->read->posted) { | |
#else | |
if (u->conn->read->prev) { | |
#endif | |
ngx_delete_posted_event(u->conn->read); | |
} | |
#if (NGX_THREADS) | |
ngx_unlock(&u->conn->lock); | |
u->conn->read->locked = 0; | |
ngx_mutex_unlock(ngx_posted_events_mutex); | |
#endif | |
/* not sure what this line does, the 0 means non-reusable */ | |
ngx_reusable_connection(u->conn, 0); | |
/* invalidate connection object */ | |
u->conn->fd = -1; | |
//u->conn->data = NULL; | |
ngx_free_connection(u->conn); | |
u->conn = NULL; | |
} | |
free(u); | |
co_ctx->data = NULL; | |
} | |
static int ngx_http_lua_cqueues_wait(lua_State *L) { | |
ngx_http_request_t *r; | |
ngx_http_lua_ctx_t *ctx; | |
ngx_http_lua_co_ctx_t *co_ctx; | |
ngx_http_lua_udata_t *u; | |
ngx_socket_t fd = luaL_optint(L, 1, -1); /* -1 is invalid fd */ | |
double timeout = luaL_optnumber(L, 2, HUGE_VAL); /* default to infinite timeout */ | |
if (fd < 0 && timeout == HUGE_VAL) { | |
return luaL_error(L, "must provide a valid file descriptor or timeout"); | |
} | |
if ((r = ngx_http_lua_get_req(L)) == NULL) { | |
return luaL_error(L, "no request found"); | |
} | |
if ((ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module)) == NULL) { | |
return luaL_error(L, "no request ctx found"); | |
} | |
if ((co_ctx = ctx->cur_co_ctx) == NULL) { | |
return luaL_error(L, "no co ctx found"); | |
} | |
if ((u = calloc(1, sizeof(ngx_http_lua_udata_t))) == NULL) { | |
return luaL_error(L, "no mem"); | |
} | |
/* always create a connection object (even if fd is < 0) */ | |
if ((u->conn = ngx_get_connection(fd, r->connection->log)) == NULL) { | |
free(u); | |
return luaL_error(L, "unable to get nginx connection"); | |
} | |
ngx_http_lua_cleanup_pending_operation(co_ctx); | |
u->conn->data = u; | |
u->conn->read->handler = ngx_http_lua_cqueues_rev_handler; | |
// u->conn->read->oneshot = 1; | |
u->conn->read->log = u->conn->log; | |
//u->conn->write->handler = NULL; | |
//u->conn->write->log = u->conn->log; | |
u->conn->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); | |
if (fd >= 0) { | |
/* Add connection object to objects watched for readability */ | |
//if (ngx_add_event) { | |
// if (ngx_add_event(u->conn->read, NGX_READ_EVENT, | |
// (ngx_event_flags & NGX_USE_CLEAR_EVENT) ? | |
// /* kqueue, epoll */ NGX_CLEAR_EVENT: | |
// /* select, poll, /dev/poll */ NGX_LEVEL_EVENT | |
// /* eventport event type has no meaning: oneshot only */ | |
// ) != NGX_OK) { | |
// goto add_failed; | |
// } | |
//} else { /* rtsig */ | |
// if (ngx_add_conn(u->conn) == NGX_ERROR) { | |
// goto add_failed; | |
// } | |
//} | |
if (ngx_handle_read_event(u->conn->read, NGX_LEVEL_EVENT) != NGX_OK) { | |
ngx_free_connection(u->conn); | |
free(u); | |
return luaL_error(L, "unable to add to nginx main loop"); | |
} | |
} | |
u->request = r; | |
u->co_ctx = co_ctx; | |
co_ctx->cleanup = (ngx_http_cleanup_pt)&ngx_http_lua_cqueues_cleanup; | |
co_ctx->data = u; | |
if (timeout != HUGE_VAL) { /* no point adding an infinite timeout */ | |
ngx_add_timer(u->conn->read, (ngx_msec_t)(timeout * 1000)); | |
} | |
/* make sure nginx knows what to do next */ | |
if (ctx->entered_content_phase) { | |
r->write_event_handler = ngx_http_lua_content_wev_handler; | |
} else { | |
r->write_event_handler = ngx_http_core_run_phases; | |
} | |
return lua_yield(L, 0); | |
} | |
void ngx_http_lua_inject_cqueues(lua_State *L) { | |
lua_pushcfunction(L, ngx_http_lua_cqueues_wait); | |
lua_setfield(L, -2, "wait"); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef _NGX_HTTP_LUA_CQUEUES_H_INCLUDED_ | |
#define _NGX_HTTP_LUA_CQUEUES_H_INCLUDED_ | |
#include <lua.h> | |
void ngx_http_lua_inject_cqueues(lua_State *L); | |
#endif /* _NGX_HTTP_LUA_CQUEUES_H_INCLUDED_ */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment