Skip to content

Instantly share code, notes, and snippets.

@tmthrgd
Created March 8, 2017 10:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tmthrgd/9e2cea5ed61c939b4940f8062dc40193 to your computer and use it in GitHub Desktop.
Save tmthrgd/9e2cea5ed61c939b4940f8062dc40193 to your computer and use it in GitHub Desktop.
diff --git a/ngx_http_ether_module.c b/ngx_http_ether_module.c
index c065b40..82fa67a 100644
--- a/ngx_http_ether_module.c
+++ b/ngx_http_ether_module.c
@@ -106,6 +106,9 @@ typedef struct {
ngx_buf_t tmp_recv;
+ ngx_event_t get_ev;
+ ngx_queue_t sub_gets;
+
ngx_queue_t queue;
} ngx_ether_memc_server_st;
@@ -127,6 +130,10 @@ typedef struct _ngx_ether_memc_op_st {
ngx_buf_t send;
ngx_buf_t recv;
+ int sub_get_noent;
+ ngx_queue_t sub_gets;
+ ngx_queue_t get_queue;
+
ngx_queue_t recv_queue;
ngx_queue_t send_queue;
} ngx_ether_memc_op_st;
@@ -261,8 +268,12 @@ static ngx_uint_t ngx_ether_find_chash_point(ngx_uint_t npoints,
static void ngx_ether_memc_read_handler(ngx_event_t *rev);
static void ngx_ether_memc_write_handler(ngx_event_t *wev);
+static void ngx_ether_multiget_cleanup_handler(void *data);
+static void ngx_ether_multiget_timeout_handler(ngx_event_t *ev);
+
static void ngx_ether_memc_default_op_handler(ngx_ether_memc_op_st *op, void *data);
static void ngx_ether_memc_event_op_handler(ngx_ether_memc_op_st *op, void *data);
+static void ngx_ether_memc_multiget_op_handler(ngx_ether_memc_op_st *op, void *data);
static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer_st *peer,
protocol_binary_command cmd, const ngx_keyval_t *kv, void *data);
@@ -2057,6 +2068,7 @@ static ngx_int_t ngx_ether_handle_member_resp_body(ngx_connection_t *c, ngx_ethe
ngx_connection_t *sc = NULL;
ngx_peer_connection_t pc;
ngx_ether_memc_op_st *op;
+ ngx_pool_cleanup_t *cln;
#if NGX_DEBUG
ngx_keyval_t kv = {ngx_null_string, ngx_null_string};
#endif /* NGX_DEBUG */
@@ -2276,6 +2288,10 @@ static ngx_int_t ngx_ether_handle_member_resp_body(ngx_connection_t *c, ngx_ethe
q = prev_q;
}
+ if (server->get_ev.timer_set) {
+ ngx_del_timer(&server->get_ev);
+ }
+
if (server->tmp_recv.start) {
ngx_pfree(c->pool, server->tmp_recv.start);
}
@@ -2449,6 +2465,20 @@ static ngx_int_t ngx_ether_handle_member_resp_body(ngx_connection_t *c, ngx_ethe
continue;
}
+ server->get_ev.handler = ngx_ether_multiget_timeout_handler;
+ server->get_ev.data = server;
+ server->get_ev.log = c->log;
+
+ ngx_queue_init(&server->sub_gets);
+
+ cln = ngx_pool_cleanup_add(c->pool, 0);
+ if (!cln) {
+ goto error;
+ }
+
+ cln->handler = ngx_ether_multiget_cleanup_handler;
+ cln->data = &server->get_ev;
+
ngx_queue_init(&server->recv_ops);
ngx_queue_init(&server->send_ops);
@@ -3063,6 +3093,42 @@ static void ngx_ether_memc_write_handler(ngx_event_t *wev)
}
}
+static void ngx_ether_multiget_cleanup_handler(void *data)
+{
+ ngx_event_t *ev = data;
+
+ if (ev && ev->timer_set) {
+ ngx_del_timer(ev);
+ }
+}
+
+static void ngx_ether_multiget_timeout_handler(ngx_event_t *ev)
+{
+ ngx_ether_memc_server_st *server;
+ ngx_keyval_t kv;
+ ngx_ether_memc_op_st *op;
+
+ server = ev->data;
+
+ ngx_str_set(&kv.key, ":ether-multiget-sentinal");
+ ngx_str_null(&kv.value);
+
+ op = ngx_ether_memc_start_operation(server->peer, PROTOCOL_BINARY_CMD_GET, &kv, server);
+ if (!op) {
+ ngx_log_error(NGX_LOG_ERR, server->peer->log, 0,
+ "sending memc multiget sentinal failed");
+ return;
+ }
+
+ op->handler = ngx_ether_memc_multiget_op_handler;
+
+ op->sub_gets = server->sub_gets;
+ op->sub_gets.prev->next = &op->sub_gets;
+ op->sub_gets.next->prev = &op->sub_gets;
+
+ ngx_queue_init(&server->sub_gets);
+}
+
static void ngx_ether_memc_default_op_handler(ngx_ether_memc_op_st *op, void *data)
{
if (ngx_ether_memc_complete_operation(op, NULL, NULL) != NGX_AGAIN) {
@@ -3077,6 +3143,39 @@ static void ngx_ether_memc_event_op_handler(ngx_ether_memc_op_st *op, void *data
ngx_post_event(ev, &ngx_posted_events);
}
+static void ngx_ether_memc_multiget_op_handler(ngx_ether_memc_op_st *op, void *data)
+{
+ ngx_queue_t *q1, *q2;
+ ngx_ether_memc_op_st *get_op, *recv_op;
+
+ for (q1 = ngx_queue_head(&op->sub_gets);
+ q1 != ngx_queue_sentinel(&op->sub_gets);
+ q1 = ngx_queue_next(q1)) {
+ get_op = ngx_queue_data(q1, ngx_ether_memc_op_st, get_queue);
+
+ recv_op = NULL;
+ for (q2 = ngx_queue_head(&op->server->recv_ops);
+ q2 != ngx_queue_sentinel(&op->server->recv_ops);
+ q2 = ngx_queue_next(q2)) {
+ recv_op = ngx_queue_data(q2, ngx_ether_memc_op_st, recv_queue);
+
+ if (get_op == recv_op) {
+ break;
+ }
+ }
+
+ if (get_op != recv_op) {
+ continue;
+ }
+
+ ngx_queue_remove(&recv_op->recv_queue);
+
+ recv_op->sub_get_noent = 1;
+
+ recv_op->handler(recv_op, recv_op->handler_data);
+ }
+}
+
static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer_st *peer,
protocol_binary_command cmd, const ngx_keyval_t *kv, void *in_data)
{
@@ -3091,7 +3190,7 @@ static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer
uint32_t id1;
ngx_ether_memc_server_st *server;
uint32_t hash;
- int is_quiet = 0;
+ int is_quiet = 0, multiget_sentinal = 0;
protocol_binary_request_header *req_hdr;
protocol_binary_request_set *reqs;
const protocol_binary_request_no_extras *in_req = in_data;
@@ -3104,6 +3203,12 @@ static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer
server = in_data;
in_req = NULL;
+ } else if (cmd == PROTOCOL_BINARY_CMD_GET && ngx_strncmp(kv->key.data, ":ether-multiget-sentinal", kv->key.len) == 0) {
+ multiget_sentinal = 1;
+
+ server = in_data;
+
+ in_req = NULL;
} else {
if (!peer->memc.npoints) {
return NULL;
@@ -3203,7 +3308,13 @@ static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer
p += ext_len;
req_hdr->request.magic = PROTOCOL_BINARY_REQ;
- req_hdr->request.opcode = cmd;
+
+ if (cmd == PROTOCOL_BINARY_CMD_GET && !multiget_sentinal) {
+ req_hdr->request.opcode = PROTOCOL_BINARY_CMD_GETQ;
+ } else {
+ req_hdr->request.opcode = cmd;
+ }
+
req_hdr->request.keylen = htons(kv->key.len);
req_hdr->request.extlen = ext_len;
req_hdr->request.datatype = PROTOCOL_BINARY_RAW_BYTES;
@@ -3255,6 +3366,14 @@ static ngx_ether_memc_op_st *ngx_ether_memc_start_operation(const ngx_ether_peer
ngx_queue_insert_tail(&server->send_ops, &op->send_queue);
+ if (cmd == PROTOCOL_BINARY_CMD_GET) {
+ ngx_queue_insert_tail(&server->sub_gets, &op->get_queue);
+
+ if (!server->get_ev.timer_set) {
+ ngx_add_timer(&server->get_ev, 100);
+ }
+ }
+
server->c->write->handler(server->c->write);
return op;
@@ -3282,6 +3401,16 @@ static ngx_int_t ngx_ether_memc_complete_operation(const ngx_ether_memc_op_st *o
protocol_binary_response_no_extras *out_res = out_data;
protocol_binary_response_get *resg, *out_resg = out_data;
+ if (op->sub_get_noent) {
+ if (out_res) {
+ out_res->message.header.response.status = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+ //out_res->message.header.response.cas = 0;
+ }
+
+ ngx_log_error(NGX_LOG_DEBUG, op->log, 0, "memcached error %hd: ", PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
+ return NGX_ERROR;
+ }
+
if (!op->recv.start) {
/* memc_read_handler has not yet been invoked for this op */
return NGX_AGAIN;
@@ -3330,7 +3459,7 @@ static ngx_int_t ngx_ether_memc_complete_operation(const ngx_ether_memc_op_st *o
out_res->message.header.response.status = status;
out_res->message.header.response.cas = ntohll(res_hdr->response.cas);
- if (res_hdr->response.opcode == PROTOCOL_BINARY_CMD_GET) {
+ if (res_hdr->response.opcode == PROTOCOL_BINARY_CMD_GETQ) {
resg = (protocol_binary_response_get *)res_hdr;
out_resg->message.body.flags = ntohl(resg->message.body.flags);
}
@@ -3346,7 +3475,7 @@ static ngx_int_t ngx_ether_memc_complete_operation(const ngx_ether_memc_op_st *o
log_level = NGX_LOG_ERR;
- if (res_hdr->response.opcode == PROTOCOL_BINARY_CMD_GET
+ if (res_hdr->response.opcode == PROTOCOL_BINARY_CMD_GETQ
&& status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
log_level = NGX_LOG_DEBUG;
}
@@ -3369,6 +3498,11 @@ static void ngx_ether_memc_cleanup_operation(ngx_ether_memc_op_st *op)
ngx_queue_remove(&op->send_queue);
}
+ /*if (ngx_queue_prev(&op->get_queue)
+ && ngx_queue_next(ngx_queue_prev(&op->get_queue)) == &op->get_queue) {
+ ngx_queue_remove(&op->get_queue);
+ }*/
+
if (op->send.start) {
ngx_pfree(peer->pool, op->send.start);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment