-
-
Save julik/1e55255c8f582d3d96f7a93570dcc3d1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def stream_through(write_destination, url, byte_range, task) | |
LOGGER.debug { "Starting or restarting stream_through for #{url} with range #{byte_range}"} | |
# Obtuse URL management is strong with this library | |
uri = URI(url) | |
base_uri = uri.dup | |
base_uri.path = "" | |
base_uri.query = nil | |
# IN THEORY we could be able to multiplex this since most of our requests are to a small number of origins | |
endpoint = Async::HTTP::URLEndpoint.parse(base_uri.to_s) | |
client = Async::HTTP::Client.new(endpoint) | |
# Unlike literally anything else async-http primordially desires | |
# to have headers as arrays, not as a Hash. | |
headers_for_upstream = [ | |
["User-Agent", USER_AGENT_STRING], | |
["Range", "bytes=%d-%d" % [byte_range.begin, byte_range.end]], | |
] | |
# These values have to be preserved when we retry | |
request_attempts ||= 0 | |
bytes_read ||= 0 | |
response = client.get(uri.request_uri, headers_for_upstream) | |
if (500..505).cover?(response.status) | |
raise IntermittentUpstreamError, "Upstream responded with #{response.status}" | |
end | |
unless [200, 206].include?(response.status) | |
raise UpstreamRefused, "Upstream refused with HTTP status #{response.status}" | |
end | |
unless body = response.body | |
raise IntermittentUpstreamError, "Upstream returned an empty body" | |
end | |
# Do a sanity check that we indeed got sent what we asked for, length-wise | |
# async-http in its infinite wisdom strips Content-Range and Content-Length from the response :facepalm: | |
# but we still can query the body object even if we haven't read it out in full | |
expected_content_length = RangeUtils.size_from_range(byte_range) | |
if body.length != expected_content_length | |
raise UpstreamRefused, "Upstream returned us #{body.length} bytes while we asked for #{expected_content_length}" | |
end | |
# See below what this is for, it is easier to deref it once | |
while chunk = body.read | |
# This is used for testing the upstream server hanging up every now and then | |
# raise EOFError if rand(1..128) == 1 | |
bytes_read += chunk.bytesize | |
# If the write queue is full here, the task will be suspended using a Condition | |
# and will only resume once the flow on the queue is restored. | |
write_destination.write(chunk) | |
end | |
close_all(body, client, response) | |
rescue IntermittentUpstreamError, EOFError => ex | |
request_attempts += 1 | |
raise if request_attempts > MAX_UPSTREAM_RECONNECT_ATTEMPTS | |
# The connection _might_ break during read_nonblock, in which case we really need | |
# to reconnect and restart but from the place where we left off. | |
LOGGER.warn { "Upstream hung up or intermittently refused (#{ex.class}), restarting from where it went away (after #{bytes_read} bytes)" } | |
byte_range = (byte_range.begin + bytes_read)..byte_range.end | |
# If we do not close the client the entire reactor will be blocked for 1 second during `client.call` | |
close_all(body, client, response) | |
retry | |
ensure | |
close_all(body, client, response) | |
end | |
def close_all(*args) | |
args.each do |arg| | |
arg.close if arg && arg.respond_to?(:close) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment