Skip to content

Instantly share code, notes, and snippets.

@crowell
Created September 23, 2015 18:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save crowell/59bfa1bb9f0cda30c48a to your computer and use it in GitHub Desktop.
Save crowell/59bfa1bb9f0cda30c48a to your computer and use it in GitHub Desktop.
APU_DECLARE(apr_status_t)
apr_memcache_multgetp(apr_memcache_t *mc,
apr_pool_t *temp_pool,
apr_pool_t *data_pool,
apr_hash_t *values)
{
apr_status_t rv;
apr_memcache_server_t* ms;
apr_memcache_conn_t* conn;
apr_uint32_t hash;
apr_size_t written;
apr_size_t klen;
apr_memcache_value_t* value;
apr_hash_index_t* value_hash_index;
/* this is a little over aggresive, but beats multiple loops
* to figure out how long each vector needs to be per-server.
*/
apr_int32_t veclen = 2 + 2 * apr_hash_count(values) - 1; /* get <key>[<space><key>...]\r\n */
apr_int32_t i, j;
apr_int32_t queries_sent;
apr_int32_t queries_recvd;
apr_hash_t * server_queries = apr_hash_make(temp_pool);
struct cache_server_query_t* server_query;
apr_hash_index_t * query_hash_index;
apr_pollset_t* pollset;
const apr_pollfd_t* activefds;
apr_pollfd_t* pollfds;
/* build all the queries */
value_hash_index = apr_hash_first(temp_pool, values);
while (value_hash_index) {
void *v;
apr_hash_this(value_hash_index, NULL, NULL, &v);
value = v;
value_hash_index = apr_hash_next(value_hash_index);
klen = strlen(value->key);
hash = apr_memcache_hash(mc, value->key, klen);
ms = apr_memcache_find_server_hash(mc, hash);
if (ms == NULL) {
continue;
}
server_query = apr_hash_get(server_queries, &ms, sizeof(ms));
if (!server_query) {
rv = ms_find_conn(ms, &conn);
if (rv != APR_SUCCESS) {
apr_memcache_disable_server(mc, ms);
value->status = rv;
continue;
}
server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t));
apr_hash_set(server_queries, &ms, sizeof(ms), server_query);
server_query->ms = ms;
server_query->conn = conn;
server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen);
/* set up the first key */
server_query->query_vec[0].iov_base = MC_GET;
server_query->query_vec[0].iov_len = MC_GET_LEN;
server_query->query_vec[1].iov_base = (void*)(value->key);
server_query->query_vec[1].iov_len = klen;
server_query->query_vec[2].iov_base = MC_EOL;
server_query->query_vec[2].iov_len = MC_EOL_LEN;
server_query->query_vec_count = 3;
}
else {
j = server_query->query_vec_count - 1;
server_query->query_vec[j].iov_base = MC_WS;
server_query->query_vec[j].iov_len = MC_WS_LEN;
j++;
server_query->query_vec[j].iov_base = (void*)(value->key);
server_query->query_vec[j].iov_len = klen;
j++;
server_query->query_vec[j].iov_base = MC_EOL;
server_query->query_vec[j].iov_len = MC_EOL_LEN;
j++;
server_query->query_vec_count = j;
}
}
/* create polling structures */
pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t));
rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool, 0);
if (rv != APR_SUCCESS) {
query_hash_index = apr_hash_first(temp_pool, server_queries);
while (query_hash_index) {
void *v;
apr_hash_this(query_hash_index, NULL, NULL, &v);
server_query = v;
query_hash_index = apr_hash_next(query_hash_index);
mget_conn_result(TRUE, TRUE, rv, mc, server_query->ms, server_query->conn,
server_query, values, server_queries);
}
return rv;
}
/* send all the queries */
queries_sent = 0;
query_hash_index = apr_hash_first(temp_pool, server_queries);
while (query_hash_index) {
void *v;
apr_hash_this(query_hash_index, NULL, NULL, &v);
server_query = v;
query_hash_index = apr_hash_next(query_hash_index);
conn = server_query->conn;
ms = server_query->ms;
for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i += APR_MAX_IOVEC_SIZE) {
rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]),
veclen-i>APR_MAX_IOVEC_SIZE ? APR_MAX_IOVEC_SIZE : veclen-i , &written);
}
if (rv != APR_SUCCESS) {
mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
server_query, values, server_queries);
continue;
}
pollfds[queries_sent].desc_type = APR_POLL_SOCKET;
pollfds[queries_sent].reqevents = APR_POLLIN;
pollfds[queries_sent].p = temp_pool;
pollfds[queries_sent].desc.s = conn->sock;
pollfds[queries_sent].client_data = (void *)server_query;
apr_pollset_add (pollset, &pollfds[queries_sent]);
queries_sent++;
}
while (queries_sent) {
rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds);
if (rv != APR_SUCCESS) {
/* timeout */
queries_sent = 0;
continue;
}
for (i = 0; i < queries_recvd; i++) {
server_query = activefds[i].client_data;
conn = server_query->conn;
ms = server_query->ms;
rv = get_server_line(conn);
if (rv != APR_SUCCESS) {
apr_pollset_remove (pollset, &activefds[i]);
mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
server_query, values, server_queries);
queries_sent--;
continue;
}
if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
char *key;
char *flags;
char *length;
char *last;
char *data;
apr_size_t len = 0;
key = apr_strtok(conn->buffer, " ", &last); /* just the VALUE, ignore */
key = apr_strtok(NULL, " ", &last);
flags = apr_strtok(NULL, " ", &last);
length = apr_strtok(NULL, " ", &last);
if (length) {
len = atoi(length);
}
value = apr_hash_get(values, key, strlen(key));
if (value) {
if (len >= 0) {
apr_bucket_brigade *bbb;
apr_bucket *e;
/* eat the trailing \r\n */
rv = apr_brigade_partition(conn->bb, len+2, &e);
if (rv != APR_SUCCESS) {
apr_pollset_remove (pollset, &activefds[i]);
mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
server_query, values, server_queries);
queries_sent--;
continue;
}
bbb = apr_brigade_split(conn->bb, e);
rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool);
if (rv != APR_SUCCESS) {
apr_pollset_remove (pollset, &activefds[i]);
mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
server_query, values, server_queries);
queries_sent--;
continue;
}
rv = apr_brigade_destroy(conn->bb);
if (rv != APR_SUCCESS) {
apr_pollset_remove (pollset, &activefds[i]);
mget_conn_result(FALSE, FALSE, rv, mc, ms, conn,
server_query, values, server_queries);
queries_sent--;
continue;
}
conn->bb = bbb;
value->len = len - 2;
data[value->len] = '\0';
value->data = data;
}
value->status = rv;
value->flags = atoi(flags);
/* stay on the server */
i--;
}
else {
/* TODO: Server Sent back a key I didn't ask for or my
* hash is corrupt */
}
}
else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
/* this connection is done */
apr_pollset_remove (pollset, &activefds[i]);
ms_release_conn(ms, conn);
apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
queries_sent--;
}
else {
/* unknown reply? */
rv = APR_EGENERAL;
}
} /* /for */
} /* /while */
query_hash_index = apr_hash_first(temp_pool, server_queries);
while (query_hash_index) {
void *v;
apr_hash_this(query_hash_index, NULL, NULL, &v);
server_query = v;
query_hash_index = apr_hash_next(query_hash_index);
conn = server_query->conn;
ms = server_query->ms;
mget_conn_result(TRUE, (rv == APR_SUCCESS), rv, mc, ms, conn,
server_query, values, server_queries);
continue;
}
apr_pollset_destroy(pollset);
apr_pool_clear(temp_pool);
return APR_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment