Skip to content

Instantly share code, notes, and snippets.

@dcramer
Created October 17, 2012 04:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dcramer/3903690 to your computer and use it in GitHub Desktop.
Save dcramer/3903690 to your computer and use it in GitHub Desktop.

Goal

Implement a capped pool which will start dropping messages once it reaches <size>. This will have a much lower rate of failure (due to load) than a database or standard queue.

For example, if there is a burst of data coming in, we'd first stick it into the pool, and then fire off a job to the queue (the job would be argless and simple say "get data from pool", it could also eventually be replaced with a continuous processor). This guarantees that your queue isnt overflowing with large amounts of data, but rather potential no-ops (just like our buffer implementation), and will also ensure you don't "get behind", but instead you simply lose messages.

Add entry to sorted set, and trim sorted set to SIZE. Entries have a random score value 0 to sys.maxint (small chance of collision).

with redis.map() as conn:
  conn.zadd(key, giant_blob_of_data, random.randint(0, sys.maxint))
  conn.zremrangebyrank(key, pool_size + 1, -1)

Get an entry by doing a zrangebyscore and zremrangebyscore.

with redis.map() as conn:
  item = conn.zrange(key, 0, 1)
  item = conn.zremrangebyrank(key, 0, 1)

One thing of note here is that any item which is put into a higher score has a higher chance to get trimmed. This probably is statistically insignificant, but is worth mentioning.

We use sys.maxint (instead of pool_size) for the score value max, because we want to ensure that if we have a low volume of data, there's a very low chance for a score colission.

Alternative Solution

(This is more statistically correct)

Add entry is a single call:

redis.zadd(key, giant_blob_of_data, random.randint(0, pool_size))

Get an entry by doing a zrangebyscore and zremrangebyscore.

val = random.randint(0, pool_size)
with redis.map() as conn:
  item_a = conn.zrange(key, val, 1, withscores=True)
  item_b = conn.zrevrange(key, val, 1, withscores=True)

# pick either item, doesnt matter
item, score = (item_a or item_b)

# remove matching scored item
redis.zremrangebyscore(key, score, 1)
@mwhooker
Copy link

I tend to think having a hash where you HINCRBY exceptions to get a count (reducing memory overhead) and a separate set to maintain cardinality would be a good idea.

something like

def receive_exc(exc):
  with redis.pipe() as p:
    p.hincr('exceptions', exc, 1)
    p.sadd('exc-members', exc)
  p.execute()
  while redis.scard('exc-members') > pool_size:
    v = redis.spop('exc-member')
    redis.hdel('exceptions', v)

@dlorenc
Copy link

dlorenc commented Oct 17, 2012

Why not just use scard and spop? Add becomes:
::

with redis.map() as conn:
conn.sadd(key, giant_blob_of_data)
if conn.scard(key) > SIZE:
conn.spop(key)

And retrieve simply checks if the item is still in the set. This works because spop just removes and returns a random element, which you can discard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment