-
-
Save crowell/59bfa1bb9f0cda30c48a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
APU_DECLARE(apr_status_t) | |
apr_memcache_multgetp(apr_memcache_t *mc, | |
apr_pool_t *temp_pool, | |
apr_pool_t *data_pool, | |
apr_hash_t *values) | |
{ | |
apr_status_t rv; | |
apr_memcache_server_t* ms; | |
apr_memcache_conn_t* conn; | |
apr_uint32_t hash; | |
apr_size_t written; | |
apr_size_t klen; | |
apr_memcache_value_t* value; | |
apr_hash_index_t* value_hash_index; | |
/* this is a little over aggresive, but beats multiple loops | |
* to figure out how long each vector needs to be per-server. | |
*/ | |
apr_int32_t veclen = 2 + 2 * apr_hash_count(values) - 1; /* get <key>[<space><key>...]\r\n */ | |
apr_int32_t i, j; | |
apr_int32_t queries_sent; | |
apr_int32_t queries_recvd; | |
apr_hash_t * server_queries = apr_hash_make(temp_pool); | |
struct cache_server_query_t* server_query; | |
apr_hash_index_t * query_hash_index; | |
apr_pollset_t* pollset; | |
const apr_pollfd_t* activefds; | |
apr_pollfd_t* pollfds; | |
/* build all the queries */ | |
value_hash_index = apr_hash_first(temp_pool, values); | |
while (value_hash_index) { | |
void *v; | |
apr_hash_this(value_hash_index, NULL, NULL, &v); | |
value = v; | |
value_hash_index = apr_hash_next(value_hash_index); | |
klen = strlen(value->key); | |
hash = apr_memcache_hash(mc, value->key, klen); | |
ms = apr_memcache_find_server_hash(mc, hash); | |
if (ms == NULL) { | |
continue; | |
} | |
server_query = apr_hash_get(server_queries, &ms, sizeof(ms)); | |
if (!server_query) { | |
rv = ms_find_conn(ms, &conn); | |
if (rv != APR_SUCCESS) { | |
apr_memcache_disable_server(mc, ms); | |
value->status = rv; | |
continue; | |
} | |
server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t)); | |
apr_hash_set(server_queries, &ms, sizeof(ms), server_query); | |
server_query->ms = ms; | |
server_query->conn = conn; | |
server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen); | |
/* set up the first key */ | |
server_query->query_vec[0].iov_base = MC_GET; | |
server_query->query_vec[0].iov_len = MC_GET_LEN; | |
server_query->query_vec[1].iov_base = (void*)(value->key); | |
server_query->query_vec[1].iov_len = klen; | |
server_query->query_vec[2].iov_base = MC_EOL; | |
server_query->query_vec[2].iov_len = MC_EOL_LEN; | |
server_query->query_vec_count = 3; | |
} | |
else { | |
j = server_query->query_vec_count - 1; | |
server_query->query_vec[j].iov_base = MC_WS; | |
server_query->query_vec[j].iov_len = MC_WS_LEN; | |
j++; | |
server_query->query_vec[j].iov_base = (void*)(value->key); | |
server_query->query_vec[j].iov_len = klen; | |
j++; | |
server_query->query_vec[j].iov_base = MC_EOL; | |
server_query->query_vec[j].iov_len = MC_EOL_LEN; | |
j++; | |
server_query->query_vec_count = j; | |
} | |
} | |
/* create polling structures */ | |
pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t)); | |
rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool, 0); | |
if (rv != APR_SUCCESS) { | |
query_hash_index = apr_hash_first(temp_pool, server_queries); | |
while (query_hash_index) { | |
void *v; | |
apr_hash_this(query_hash_index, NULL, NULL, &v); | |
server_query = v; | |
query_hash_index = apr_hash_next(query_hash_index); | |
mget_conn_result(TRUE, TRUE, rv, mc, server_query->ms, server_query->conn, | |
server_query, values, server_queries); | |
} | |
return rv; | |
} | |
/* send all the queries */ | |
queries_sent = 0; | |
query_hash_index = apr_hash_first(temp_pool, server_queries); | |
while (query_hash_index) { | |
void *v; | |
apr_hash_this(query_hash_index, NULL, NULL, &v); | |
server_query = v; | |
query_hash_index = apr_hash_next(query_hash_index); | |
conn = server_query->conn; | |
ms = server_query->ms; | |
for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i += APR_MAX_IOVEC_SIZE) { | |
rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]), | |
veclen-i>APR_MAX_IOVEC_SIZE ? APR_MAX_IOVEC_SIZE : veclen-i , &written); | |
} | |
if (rv != APR_SUCCESS) { | |
mget_conn_result(FALSE, FALSE, rv, mc, ms, conn, | |
server_query, values, server_queries); | |
continue; | |
} | |
pollfds[queries_sent].desc_type = APR_POLL_SOCKET; | |
pollfds[queries_sent].reqevents = APR_POLLIN; | |
pollfds[queries_sent].p = temp_pool; | |
pollfds[queries_sent].desc.s = conn->sock; | |
pollfds[queries_sent].client_data = (void *)server_query; | |
apr_pollset_add (pollset, &pollfds[queries_sent]); | |
queries_sent++; | |
} | |
while (queries_sent) { | |
rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds); | |
if (rv != APR_SUCCESS) { | |
/* timeout */ | |
queries_sent = 0; | |
continue; | |
} | |
for (i = 0; i < queries_recvd; i++) { | |
server_query = activefds[i].client_data; | |
conn = server_query->conn; | |
ms = server_query->ms; | |
rv = get_server_line(conn); | |
if (rv != APR_SUCCESS) { | |
apr_pollset_remove (pollset, &activefds[i]); | |
mget_conn_result(FALSE, FALSE, rv, mc, ms, conn, | |
server_query, values, server_queries); | |
queries_sent--; | |
continue; | |
} | |
if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) { | |
char *key; | |
char *flags; | |
char *length; | |
char *last; | |
char *data; | |
apr_size_t len = 0; | |
key = apr_strtok(conn->buffer, " ", &last); /* just the VALUE, ignore */ | |
key = apr_strtok(NULL, " ", &last); | |
flags = apr_strtok(NULL, " ", &last); | |
length = apr_strtok(NULL, " ", &last); | |
if (length) { | |
len = atoi(length); | |
} | |
value = apr_hash_get(values, key, strlen(key)); | |
if (value) { | |
if (len >= 0) { | |
apr_bucket_brigade *bbb; | |
apr_bucket *e; | |
/* eat the trailing \r\n */ | |
rv = apr_brigade_partition(conn->bb, len+2, &e); | |
if (rv != APR_SUCCESS) { | |
apr_pollset_remove (pollset, &activefds[i]); | |
mget_conn_result(FALSE, FALSE, rv, mc, ms, conn, | |
server_query, values, server_queries); | |
queries_sent--; | |
continue; | |
} | |
bbb = apr_brigade_split(conn->bb, e); | |
rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool); | |
if (rv != APR_SUCCESS) { | |
apr_pollset_remove (pollset, &activefds[i]); | |
mget_conn_result(FALSE, FALSE, rv, mc, ms, conn, | |
server_query, values, server_queries); | |
queries_sent--; | |
continue; | |
} | |
rv = apr_brigade_destroy(conn->bb); | |
if (rv != APR_SUCCESS) { | |
apr_pollset_remove (pollset, &activefds[i]); | |
mget_conn_result(FALSE, FALSE, rv, mc, ms, conn, | |
server_query, values, server_queries); | |
queries_sent--; | |
continue; | |
} | |
conn->bb = bbb; | |
value->len = len - 2; | |
data[value->len] = '\0'; | |
value->data = data; | |
} | |
value->status = rv; | |
value->flags = atoi(flags); | |
/* stay on the server */ | |
i--; | |
} | |
else { | |
/* TODO: Server Sent back a key I didn't ask for or my | |
* hash is corrupt */ | |
} | |
} | |
else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) { | |
/* this connection is done */ | |
apr_pollset_remove (pollset, &activefds[i]); | |
ms_release_conn(ms, conn); | |
apr_hash_set(server_queries, &ms, sizeof(ms), NULL); | |
queries_sent--; | |
} | |
else { | |
/* unknown reply? */ | |
rv = APR_EGENERAL; | |
} | |
} /* /for */ | |
} /* /while */ | |
query_hash_index = apr_hash_first(temp_pool, server_queries); | |
while (query_hash_index) { | |
void *v; | |
apr_hash_this(query_hash_index, NULL, NULL, &v); | |
server_query = v; | |
query_hash_index = apr_hash_next(query_hash_index); | |
conn = server_query->conn; | |
ms = server_query->ms; | |
mget_conn_result(TRUE, (rv == APR_SUCCESS), rv, mc, ms, conn, | |
server_query, values, server_queries); | |
continue; | |
} | |
apr_pollset_destroy(pollset); | |
apr_pool_clear(temp_pool); | |
return APR_SUCCESS; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment