Skip to content

Instantly share code, notes, and snippets.

@arekinath
Created May 30, 2014 09:09
Show Gist options
  • Save arekinath/4d0b3e014fbd260fee1a to your computer and use it in GitHub Desktop.
Save arekinath/4d0b3e014fbd260fee1a to your computer and use it in GitHub Desktop.
diff --git a/c_src/nif.c b/c_src/nif.c
index 7ec1fca..93f111e 100644
--- a/c_src/nif.c
+++ b/c_src/nif.c
@@ -96,10 +96,12 @@ struct cache_queue {
ErlNifUInt64 size; /* sum of node->size for all nodes in the queue */
};
-#define FL_DYING 1
+#define FL_DYING 1
struct atom_node;
+#define N_INCR_BKT 8
+
/* lock ordering: cache_lock then lookup_lock then ctrl_lock */
struct cache {
ErlNifUInt64 max_size; /* these are only set at construction */
@@ -110,7 +112,9 @@ struct cache {
ErlNifUInt64 miss;
int flags;
- TAILQ_HEAD(cache_incr_q, cache_incr_node) incr_head;
+ TAILQ_HEAD(cache_incr_q, cache_incr_node) incr_head[N_INCR_BKT];
+ ErlNifMutex *incr_lock[N_INCR_BKT];
+
int incr_count;
ErlNifMutex *ctrl_lock;
ErlNifCond *check_cond;
@@ -199,6 +203,7 @@ static void
destroy_cache_node(struct cache_node *n)
{
struct cache_incr_node *in, *nextin;
+ int i;
TAILQ_REMOVE(&(n->q->head), n, entry);
n->q->size -= n->size;
@@ -207,15 +212,19 @@ destroy_cache_node(struct cache_node *n)
if (n->expiry.tv_sec != 0)
RB_REMOVE(expiry_tree, &(n->c->expiry_head), n);
- nextin = TAILQ_FIRST(&(n->c->incr_head));
- while ((in = nextin)) {
- nextin = TAILQ_NEXT(in, entry);
- if (in->node == n) {
- TAILQ_REMOVE(&(n->c->incr_head), in, entry);
- --(n->c->incr_count);
- in->node = 0;
- enif_free(in);
+ for (i = 0; i < N_INCR_BKT; ++i) {
+ enif_mutex_lock(n->c->incr_lock[i]);
+ nextin = TAILQ_FIRST(&(n->c->incr_head[i]));
+ while ((in = nextin)) {
+ nextin = TAILQ_NEXT(in, entry);
+ if (in->node == n) {
+ TAILQ_REMOVE(&(n->c->incr_head[i]), in, entry);
+ __sync_sub_and_fetch(&(n->c->incr_count), 1);
+ in->node = 0;
+ enif_free(in);
+ }
}
+ enif_mutex_unlock(n->c->incr_lock[i]);
}
n->c = NULL;
@@ -230,6 +239,7 @@ static void *
cache_bg_thread(void *arg)
{
struct cache *c = (struct cache *)arg;
+ int i;
while (1) {
enif_mutex_lock(c->ctrl_lock);
@@ -251,33 +261,39 @@ cache_bg_thread(void *arg)
enif_mutex_lock(c->ctrl_lock);
/* first process the promotion queue before we do any evicting */
- while (!TAILQ_EMPTY(&(c->incr_head))) {
- struct cache_incr_node *n;
- n = TAILQ_FIRST(&(c->incr_head));
- TAILQ_REMOVE(&(c->incr_head), n, entry);
- --(c->incr_count);
-
- /* let go of the ctrl_lock here, we don't need it when we aren't looking
- at the incr_queue, and this way other threads can use it while we shuffle
- queue nodes around */
- enif_mutex_unlock(c->ctrl_lock);
+ for (i = 0; i < N_INCR_BKT; ++i) {
+ enif_mutex_lock(c->incr_lock[i]);
+ while (!TAILQ_EMPTY(&(c->incr_head[i]))) {
+ struct cache_incr_node *n;
+ n = TAILQ_FIRST(&(c->incr_head[i]));
+ TAILQ_REMOVE(&(c->incr_head[i]), n, entry);
+ __sync_sub_and_fetch(&(c->incr_count), 1);
+
+ /* let go of the ctrl_lock here, we don't need it when we aren't looking
+ at the incr_queue, and this way other threads can use it while we shuffle
+ queue nodes around */
+ enif_mutex_unlock(c->incr_lock[i]);
+ enif_mutex_unlock(c->ctrl_lock);
+
+ if (n->node->q == &(c->q1)) {
+ TAILQ_REMOVE(&(c->q1.head), n->node, entry);
+ c->q1.size -= n->node->size;
+ TAILQ_INSERT_HEAD(&(c->q2.head), n->node, entry);
+ n->node->q = &(c->q2);
+ c->q2.size += n->node->size;
+
+ } else if (n->node->q == &(c->q2)) {
+ TAILQ_REMOVE(&(c->q2.head), n->node, entry);
+ TAILQ_INSERT_HEAD(&(c->q2.head), n->node, entry);
+ }
- if (n->node->q == &(c->q1)) {
- TAILQ_REMOVE(&(c->q1.head), n->node, entry);
- c->q1.size -= n->node->size;
- TAILQ_INSERT_HEAD(&(c->q2.head), n->node, entry);
- n->node->q = &(c->q2);
- c->q2.size += n->node->size;
+ enif_free(n);
- } else if (n->node->q == &(c->q2)) {
- TAILQ_REMOVE(&(c->q2.head), n->node, entry);
- TAILQ_INSERT_HEAD(&(c->q2.head), n->node, entry);
+ /* take the ctrl_lock back again for the next loop around */
+ enif_mutex_lock(c->ctrl_lock);
+ enif_mutex_lock(c->incr_lock[i]);
}
-
- enif_free(n);
-
- /* take the ctrl_lock back again for the next loop around */
- enif_mutex_lock(c->ctrl_lock);
+ enif_mutex_unlock(c->incr_lock[i]);
}
/* let go of the ctrl_lock here for two reasons:
@@ -343,18 +359,6 @@ cache_bg_thread(void *arg)
c->atom_node = NULL;
- /* free the incr_queue */
- {
- struct cache_incr_node *in, *nextin;
- nextin = TAILQ_FIRST(&(c->incr_head));
- while ((in = nextin)) {
- nextin = TAILQ_NEXT(in, entry);
- TAILQ_REMOVE(&(c->incr_head), in, entry);
- in->node = 0;
- enif_free(in);
- }
- }
-
/* free the actual cache queues */
{
struct cache_node *n, *nextn;
@@ -370,6 +374,23 @@ cache_bg_thread(void *arg)
}
}
+ for (i = 0; i < N_INCR_BKT; ++i)
+ enif_mutex_lock(c->incr_lock[i]);
+
+ /* free the incr_queue */
+ for (i = 0; i < N_INCR_BKT; ++i) {
+ struct cache_incr_node *in, *nextin;
+ nextin = TAILQ_FIRST(&(c->incr_head[i]));
+ while ((in = nextin)) {
+ nextin = TAILQ_NEXT(in, entry);
+ TAILQ_REMOVE(&(c->incr_head[i]), in, entry);
+ in->node = 0;
+ enif_free(in);
+ }
+ enif_mutex_unlock(c->incr_lock[i]);
+ enif_mutex_destroy(c->incr_lock[i]);
+ }
+
/* unlock and destroy! */
enif_cond_destroy(c->check_cond);
@@ -410,6 +431,7 @@ new_cache(ERL_NIF_TERM atom, int max_size, int min_q1_size)
{
struct cache *c;
struct atom_node *an;
+ int i;
c = enif_alloc(sizeof(*c));
memset(c, 0, sizeof(*c));
@@ -421,7 +443,10 @@ new_cache(ERL_NIF_TERM atom, int max_size, int min_q1_size)
c->check_cond = enif_cond_create("cache->check_cond");
TAILQ_INIT(&(c->q1.head));
TAILQ_INIT(&(c->q2.head));
- TAILQ_INIT(&(c->incr_head));
+ for (i = 0; i < N_INCR_BKT; ++i) {
+ TAILQ_INIT(&(c->incr_head[i]));
+ c->incr_lock[i] = enif_mutex_create("cache->incr_lock");
+ }
RB_INIT(&(c->expiry_head));
an = enif_alloc(sizeof(*an));
@@ -561,7 +586,7 @@ stats(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
enif_rwlock_rlock(c->cache_lock);
q1s = enif_make_uint64(env, c->q1.size);
q2s = enif_make_uint64(env, c->q2.size);
- incrs = enif_make_uint64(env, c->incr_count);
+ incrs = enif_make_uint64(env, __sync_fetch_and_add(&(c->incr_count), 0));
enif_rwlock_runlock(c->cache_lock);
ret = enif_make_tuple5(env,
enif_make_uint64(env, c->hit),
@@ -666,8 +691,9 @@ get(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
struct cache_node *n;
struct cache_incr_node *in;
struct timespec now;
- int incrqs;
+ int incrqs, hashv, bkt;
ERL_NIF_TERM ret;
+ ErlNifTid tid;
if (!enif_is_atom(env, argv[0]))
return enif_make_badarg(env);
@@ -701,10 +727,12 @@ get(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
in->node = n;
__sync_add_and_fetch(&c->hit, 1);
- enif_mutex_lock(c->ctrl_lock);
- TAILQ_INSERT_TAIL(&(c->incr_head), in, entry);
- incrqs = ++(c->incr_count);
- enif_mutex_unlock(c->ctrl_lock);
+ tid = enif_thread_self();
+ HASH_SFH(&tid, sizeof(ErlNifTid), N_INCR_BKT, hashv, bkt);
+ enif_mutex_lock(c->incr_lock[bkt]);
+ TAILQ_INSERT_TAIL(&(c->incr_head[bkt]), in, entry);
+ enif_mutex_unlock(c->incr_lock[bkt]);
+ incrqs = __sync_add_and_fetch(&(c->incr_count), 1);
ret = enif_make_resource_binary(env, n->val, n->val, n->vsize);
enif_rwlock_runlock(c->lookup_lock);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment