Skip to content

Instantly share code, notes, and snippets.

@yaauie
Created April 29, 2019 21:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaauie/d56515e41050e83fb3cc9692268055a7 to your computer and use it in GitHub Desktop.
Save yaauie/d56515e41050e83fb3cc9692268055a7 to your computer and use it in GitHub Desktop.
proof-of-concept generic back-pressure provider, enables blocking back-pressure to be controlled outside the code that is performing the actions.
# The BackPressureProvider allows back-pressure to be applied to non-blocking APIs when those APIs also
# provide hooks for identifying when they _should_ block.
class BackPressureProvider
def initialize(desc, logger)
@desc = desc
@logger = logger
@mutex = Mutex.new
@cond = ConditionVariable.new
@back_pressure_engaged = false
@timeout_max = 10
end
# Engages back-pressure; threads using `BackPressureProvider#execute` will be blocked until back-pressure
# is removed.
#
# @param reason [String]: the reason back-pressure is being applied, to be included in the log message.
# @return [void]
def engage_back_pressure(reason)
@mutex.synchronize do
@back_pressure_engaged = true
@logger.warn("#{@desc} back-pressure engaged: #{reason}")
end
end
# Removes back-pressure, waking any threads that were blocked while using `BackPressureProvider#execute`.
#
# @return [void]
def remove_back_pressure
@mutex.synchronize do
@back_pressure_engaged = false
@logger.info("#{@desc} back-pressure removed.")
@cond.broadcast # wakeup _all_ waiting threads
end
end
# Executes the provided block, _after_ waiting out any back-pressure.
#
# @yieldreturn [Object] the value returned from the block is returned by this method
# @return [Object]
def execute
if @back_pressure_engaged
timeout = 1
start = Time.now
thread_id = Thread.current.__id__
loop do
# instead of a blind sleep, wait for a notification with a timeout; this allows
# us to _immediately_ begin sending events when we are unblocked.
should_block = @mutex.synchronize do
@cond.wait(@mutex, @timeout)
@back_pressure_engaged
end
break unless should_block
block_duration = Time.now - start
@logger.warn("#{desc} has been blocked with no movement for #{block_duration.round}s... (#{thread_id})")
timeout = [@timeout_max, (timeout * 2)].min
end
end
yield
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment