Skip to content

Instantly share code, notes, and snippets.

@adambird
Created October 10, 2012 11:05
Show Gist options
  • Save adambird/3864810 to your computer and use it in GitHub Desktop.
Save adambird/3864810 to your computer and use it in GitHub Desktop.
Describing a throttling approach for inbound messages
class WorkAllocator
BUCKET_SIZE_SECS = 5
def allocate_task_to_bucket(system_id, task, received_at)
if bucket = get_bucket_for_task(system_id, received_at)
bucket.put task
else
# discard task
end
end
def get_bucket_for_task(system_id, received_at)
# initialise delta used to generate bucket_id
bucket_id_delta = 0
system_profile = get_system_profile(system_id)
# find next available respecting retention limit
while system_profile.retention_limit >= (bucket_id_delta * BUCKET_SIZE_SECS)
# work out a root value for the bucket_id based on a time delta
bucket_id_root = (received_at.to_i / BUCKET_SIZE_SECS) + bucket_id_delta
# use system profile to generate bucket_id thus allowing
# system specific buckets
bucket_id = system_profile.get_bucket_id(bucket_id_root)
# return if the bucket has available work slots
return buckets[bucket_id] if buckets[bucket_id].count_of_tasks_for_system >= (system_profile.throttle_rate_per_sec * BUCKET_SIZE_SECS)
bucket_id_delta++
end
# log that retention period for task passed.
end
end
class Worker
WORKER_DELAY = 1
def initialize(system_id=nil)
@system_id = nil
end
def process_buckets
while processing?
# generate bucket id with a conf
bucket_id = (Time.now.to_i / BUCKET_SIZE_SECS) - WORKER_DELAY
while task = buckets[bucket_id].next_task
process task
end
# poss need a wait here
end
end
def buckets
# get default buckets or system specific buckets
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment