Skip to content

Instantly share code, notes, and snippets.

@iamatypeofwalrus
Created March 1, 2017 21:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iamatypeofwalrus/8105e291760c6235025a63fe65ed7705 to your computer and use it in GitHub Desktop.
Save iamatypeofwalrus/8105e291760c6235025a63fe65ed7705 to your computer and use it in GitHub Desktop.
Ruby Aws SDK V2 ConnectionPool forking issue
# Run and fail
# AWS_ACCESS_KEY_ID=*** ruby connection_pool_issue.rb
#
# Does not crash
# empty_connection_pools_after_fork=true ruby connection_pool_issue.rb
#
# Hypothesis:
#
# the Aws SDK maintains a ConnectionPool in Seahorse. After fork
# file descriptors are shared between the parent process and any child processes.
# Open sockets / connections are just file descriptors in unix-y environments.
# There is a race condition between a parent process and a child process using
# an existing open connection in the pool.
#
# The AWS sdk does not provide a top level API for clearing the connection pool
# nor does it document this behavior anywhere. You can, however, call into
# Seahorse::Client::NetHttp::ConnectionPool and empty the connection pools in
# the child after fork.
require 'aws-sdk'
Aws.eager_autoload!
QUEUE_NAME = "enqueue-tests"
V2 = Aws::SQS::Client.new
# When a race condition occurs this proc will log the args passed to Aws::Structure.new
# This was determined from various stack traces as the point of failure
def register_kernel_func(logger)
proc = Proc.new { |event, file, line, id, b, klass|
next unless event == "call" && klass == Aws::Structure && id == :new
val = b.local_variable_get(:args)
if val.first.length > 0
logger.info("Aws::Structure.new received args that will cause a TypeError: args: #{val}")
end
}
Kernel.set_trace_func(proc)
end
def create_queue(queue_name)
begin
queue_url = V2.get_queue_url(queue_name: queue_name).queue_url
rescue
queue_url = V2.create_queue(queue_name: queue_name).queue_url
end
Aws::SQS::Queue.new(url: queue_url)
end
def send_messages(queue, num)
num.times do |i|
body = URI.encode(JSON.dump({iteration: i}))
queue.send_message(
message_body: body
)
end
end
def receive_messages(queue, num_workers, logger, empty_connection_pools_after_fork: false)
children = []
loop do
children = reap_children(children)
num_messages = num_workers - children.length
if num_messages < 1
next
end
logger.info "Parent: calling Aws::SQS::Queue#receive_messages"
msgs = queue.receive_messages(
wait_time_seconds: 20,
max_number_of_messages: num_messages
)
logger.info "Parent: received #{msgs.count} messages from #{QUEUE_NAME}"
msgs.each do |msg|
pid = fork do
if empty_connection_pools_after_fork
logger.info "Attempting to empty connection pools"
Seahorse::Client::NetHttp::ConnectionPool.pools.each {|pl| pl.empty!}
end
# Essentially a no-op
JSON.parse(URI.decode(msg.body))
logger.info "Child: deleting message"
msg.delete
end
children << pid
end
end
end
def reap_children(children)
return [] if children.length < 1
finished = children.map {|pid| Process.waitpid(pid, Process::WNOHANG)}.compact
children - finished
end
def main
logger = Logger.new(STDOUT)
register_kernel_func(logger)
logger.info "Creating queue with name #{QUEUE_NAME}"
queue = create_queue(QUEUE_NAME)
num_messages = 200
logger.info "Sending #{num_messages} to #{QUEUE_NAME}"
send_messages(queue, num_messages)
logger.info "Processing messages in #{QUEUE_NAME}"
receive_messages(
queue,
10,
logger,
empty_connection_pools_after_fork: !!ENV["empty_connection_pools_after_fork"]
)
ensure
logger.info "Error or interrupt ocurred. Cleaning up"
queue.purge if queue rescue Aws::SQS::Errors::PurgeQueueInProgress
end
main
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment