Skip to content

Instantly share code, notes, and snippets.

@yaauie
Last active October 17, 2023 03:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaauie/eb1720c7af0b1357c67787085ea6c607 to your computer and use it in GitHub Desktop.
Save yaauie/eb1720c7af0b1357c67787085ea6c607 to your computer and use it in GitHub Desktop.
# encoding: utf-8
require 'thread'
require 'monitor'
##
# The FairEnoughRouter is a generic "fair enough" router. When instantiated
# with a collection of objects, it can be used to select an arbitrary value,
# prioritising ones for which it hasn't recently been exceptional and
# those that are currently less-concurrently used.
class FairEnoughRouter
include MonitorMixin
##
# Creates a new Router with the provided objects
# that ignores errors older than the cooloff period
# @param objects [Enumerable<Object>]: a list of objects to lease out
# @param cooloff [Integer]: The cooloff period in seconds in which downstreams with
# recent errors are deprioritized (default: 60)
def initialize(objects, cooloff: 60)
fail ArgumentError unless objects&.any?
fail ArgumentError unless cooloff&.kind_of?(Integer)
super()
@object_states = objects.map do |object|
ObjectState.new(object)
end
@cooloff = cooloff
end
##
# Yields the block with a value, prioritizing values
# whose use has not recently errored, that are currently
# less concurrently-used.
#
# @yieldparam value [Object]
def select(&block)
selected = synchronize { pick_one.tap(&:increment) }
yield(selected.value)
rescue
synchronize { selected.mark_error }
raise
ensure
synchronize { selected.decrement }
end
private
def pick_one
threshold = Time.now.to_i - @cooloff
@object_states.sort_by do |object_state|
[
[object_state.last_error, threshold].max, # deprioritize recent errors
object_state.concurrent, # deprioritize high concurrency
object_state.last_start # deprioritize recent use
]
end.first
end
##
# @api private
class ObjectState
def initialize(object)
@value = object
@concurrent = 0
@last_error = 0
@last_start = 0
end
attr_reader :value
attr_reader :concurrent
attr_reader :last_start
attr_reader :last_error
def increment
@concurrent += 1
@last_start = Time.now.to_f
end
def decrement
@concurrent -= 1
end
def mark_error
@last_error = Time.now.to_i
end
end
private_constant :ObjectState
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment