Skip to content

Instantly share code, notes, and snippets.

@daurnimator
Last active August 29, 2015 14:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daurnimator/55acef0626e00cf66c16 to your computer and use it in GitHub Desktop.
Save daurnimator/55acef0626e00cf66c16 to your computer and use it in GitHub Desktop.
ngx lua function to allow waiting for a file descriptor to be ready
#include "ngx_http_lua_cqueues.h"
#include <math.h> /* HUGE_VAL */
#include <stdlib.h> /* calloc, free */
#include <lua.h>
#include <lauxlib.h>
#include "ngx_core.h"
#include "ngx_http.h" /* ngx_http_request_t, ngx_http_run_posted_requests */
#include "ngx_http_lua_common.h" /* ngx_http_lua_module, ngx_http_lua_co_ctx_t */
/* need to include DDEBUG so that ngx_http_lua_util.h works */
#ifndef DDEBUG
#define DDEBUG 0
#endif
#include "ddebug.h"
#include "ngx_http_lua_util.h" /* ngx_http_lua_get_lua_vm, ngx_http_lua_run_thread, ngx_http_lua_finalize_request */
#include "ngx_http_lua_contentby.h" /* ngx_http_lua_content_wev_handler */
typedef struct {
ngx_http_request_t *request;
ngx_http_lua_co_ctx_t *co_ctx;
ngx_connection_t *conn;
} ngx_http_lua_udata_t;
static ngx_int_t ngx_http_lua_cqueues_resume_request(ngx_http_request_t *r) {
ngx_http_lua_ctx_t *ctx;
lua_State *vm;
ngx_int_t rc;
if ((ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module)) == NULL) {
return NGX_ERROR;
}
/* restore normal resume handler */
ctx->resume_handler = ngx_http_lua_wev_handler;
/* resume lua thread */
vm = ngx_http_lua_get_lua_vm(r, ctx);
rc = ngx_http_lua_run_thread(vm, r, ctx, 0 /*nret*/);
switch (rc) {
case NGX_DONE: /* coroutine finished */
ngx_http_lua_finalize_request(r, NGX_DONE);
/* fall-through */
case NGX_AGAIN: /* coroutine yielded */
return ngx_http_lua_run_posted_threads(r->connection, vm, r, ctx);
default: /* NGX_ERROR: coroutine failed */
if (ctx->entered_content_phase) {
ngx_http_lua_finalize_request(r, rc);
return NGX_DONE;
}
return rc;
}
}
static void ngx_http_lua_cqueues_rev_handler(ngx_event_t *ev) {
ngx_connection_t *conn = ev->data;
ngx_http_lua_udata_t *u = conn->data;
ngx_http_request_t *r = u->request;
ngx_http_lua_co_ctx_t *co_ctx = u->co_ctx;
ngx_http_lua_ctx_t *ctx;
ngx_http_lua_cleanup_pending_operation(co_ctx);
ev = NULL, conn = NULL, u = NULL; /* now invalidated */
if ((ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module)) != NULL) {
/* set current coroutine to the one that had the event */
ctx->cur_co_ctx = co_ctx;
ctx->resume_handler = ngx_http_lua_cqueues_resume_request;
/* queue/fire off handler */
r->write_event_handler(r);
}
ngx_http_run_posted_requests(r->connection);
}
static void ngx_http_lua_cqueues_cleanup(ngx_http_lua_co_ctx_t *co_ctx) {
ngx_http_lua_udata_t *u = co_ctx->data;
if (u->conn != NULL) {
/* can't use ngx_close_connection here,
as it closes the file descriptor unconditionally */
/* cancel timeout timer */
if (u->conn->read->timer_set) {
ngx_del_timer(u->conn->read);
}
if (u->conn->fd != -1) {
/* remove from mainloop; do not pass CLOSE_SOCKET flag */
ngx_del_conn(u->conn, 0);
}
/* delete any pending but not handled events */
#if (NGX_THREADS)
ngx_mutex_lock(ngx_posted_events_mutex);
#endif
#if defined(nginx_version) && nginx_version >= 1007005
if (u->conn->read->posted) {
#else
if (u->conn->read->prev) {
#endif
ngx_delete_posted_event(u->conn->read);
}
#if (NGX_THREADS)
ngx_unlock(&u->conn->lock);
u->conn->read->locked = 0;
ngx_mutex_unlock(ngx_posted_events_mutex);
#endif
/* not sure what this line does, the 0 means non-reusable */
ngx_reusable_connection(u->conn, 0);
/* invalidate connection object */
u->conn->fd = -1;
//u->conn->data = NULL;
ngx_free_connection(u->conn);
u->conn = NULL;
}
free(u);
co_ctx->data = NULL;
}
static int ngx_http_lua_cqueues_wait(lua_State *L) {
ngx_http_request_t *r;
ngx_http_lua_ctx_t *ctx;
ngx_http_lua_co_ctx_t *co_ctx;
ngx_http_lua_udata_t *u;
ngx_socket_t fd = luaL_optint(L, 1, -1); /* -1 is invalid fd */
double timeout = luaL_optnumber(L, 2, HUGE_VAL); /* default to infinite timeout */
if (fd < 0 && timeout == HUGE_VAL) {
return luaL_error(L, "must provide a valid file descriptor or timeout");
}
if ((r = ngx_http_lua_get_req(L)) == NULL) {
return luaL_error(L, "no request found");
}
if ((ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module)) == NULL) {
return luaL_error(L, "no request ctx found");
}
if ((co_ctx = ctx->cur_co_ctx) == NULL) {
return luaL_error(L, "no co ctx found");
}
if ((u = calloc(1, sizeof(ngx_http_lua_udata_t))) == NULL) {
return luaL_error(L, "no mem");
}
/* always create a connection object (even if fd is < 0) */
if ((u->conn = ngx_get_connection(fd, r->connection->log)) == NULL) {
free(u);
return luaL_error(L, "unable to get nginx connection");
}
ngx_http_lua_cleanup_pending_operation(co_ctx);
u->conn->data = u;
u->conn->read->handler = ngx_http_lua_cqueues_rev_handler;
// u->conn->read->oneshot = 1;
u->conn->read->log = u->conn->log;
//u->conn->write->handler = NULL;
//u->conn->write->log = u->conn->log;
u->conn->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
if (fd >= 0) {
/* Add connection object to objects watched for readability */
//if (ngx_add_event) {
// if (ngx_add_event(u->conn->read, NGX_READ_EVENT,
// (ngx_event_flags & NGX_USE_CLEAR_EVENT) ?
// /* kqueue, epoll */ NGX_CLEAR_EVENT:
// /* select, poll, /dev/poll */ NGX_LEVEL_EVENT
// /* eventport event type has no meaning: oneshot only */
// ) != NGX_OK) {
// goto add_failed;
// }
//} else { /* rtsig */
// if (ngx_add_conn(u->conn) == NGX_ERROR) {
// goto add_failed;
// }
//}
if (ngx_handle_read_event(u->conn->read, NGX_LEVEL_EVENT) != NGX_OK) {
ngx_free_connection(u->conn);
free(u);
return luaL_error(L, "unable to add to nginx main loop");
}
}
u->request = r;
u->co_ctx = co_ctx;
co_ctx->cleanup = (ngx_http_cleanup_pt)&ngx_http_lua_cqueues_cleanup;
co_ctx->data = u;
if (timeout != HUGE_VAL) { /* no point adding an infinite timeout */
ngx_add_timer(u->conn->read, (ngx_msec_t)(timeout * 1000));
}
/* make sure nginx knows what to do next */
if (ctx->entered_content_phase) {
r->write_event_handler = ngx_http_lua_content_wev_handler;
} else {
r->write_event_handler = ngx_http_core_run_phases;
}
return lua_yield(L, 0);
}
void ngx_http_lua_inject_cqueues(lua_State *L) {
lua_pushcfunction(L, ngx_http_lua_cqueues_wait);
lua_setfield(L, -2, "wait");
}
#ifndef _NGX_HTTP_LUA_CQUEUES_H_INCLUDED_
#define _NGX_HTTP_LUA_CQUEUES_H_INCLUDED_
#include <lua.h>
void ngx_http_lua_inject_cqueues(lua_State *L);
#endif /* _NGX_HTTP_LUA_CQUEUES_H_INCLUDED_ */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment