Skip to content

Instantly share code, notes, and snippets.

@julik
Created February 16, 2019 19:40
Show Gist options
  • Save julik/1e55255c8f582d3d96f7a93570dcc3d1 to your computer and use it in GitHub Desktop.
Save julik/1e55255c8f582d3d96f7a93570dcc3d1 to your computer and use it in GitHub Desktop.
def stream_through(write_destination, url, byte_range, task)
LOGGER.debug { "Starting or restarting stream_through for #{url} with range #{byte_range}"}
# Obtuse URL management is strong with this library
uri = URI(url)
base_uri = uri.dup
base_uri.path = ""
base_uri.query = nil
# IN THEORY we could be able to multiplex this since most of our requests are to a small number of origins
endpoint = Async::HTTP::URLEndpoint.parse(base_uri.to_s)
client = Async::HTTP::Client.new(endpoint)
# Unlike literally anything else async-http primordially desires
# to have headers as arrays, not as a Hash.
headers_for_upstream = [
["User-Agent", USER_AGENT_STRING],
["Range", "bytes=%d-%d" % [byte_range.begin, byte_range.end]],
]
# These values have to be preserved when we retry
request_attempts ||= 0
bytes_read ||= 0
response = client.get(uri.request_uri, headers_for_upstream)
if (500..505).cover?(response.status)
raise IntermittentUpstreamError, "Upstream responded with #{response.status}"
end
unless [200, 206].include?(response.status)
raise UpstreamRefused, "Upstream refused with HTTP status #{response.status}"
end
unless body = response.body
raise IntermittentUpstreamError, "Upstream returned an empty body"
end
# Do a sanity check that we indeed got sent what we asked for, length-wise
# async-http in its infinite wisdom strips Content-Range and Content-Length from the response :facepalm:
# but we still can query the body object even if we haven't read it out in full
expected_content_length = RangeUtils.size_from_range(byte_range)
if body.length != expected_content_length
raise UpstreamRefused, "Upstream returned us #{body.length} bytes while we asked for #{expected_content_length}"
end
# See below what this is for, it is easier to deref it once
while chunk = body.read
# This is used for testing the upstream server hanging up every now and then
# raise EOFError if rand(1..128) == 1
bytes_read += chunk.bytesize
# If the write queue is full here, the task will be suspended using a Condition
# and will only resume once the flow on the queue is restored.
write_destination.write(chunk)
end
close_all(body, client, response)
rescue IntermittentUpstreamError, EOFError => ex
request_attempts += 1
raise if request_attempts > MAX_UPSTREAM_RECONNECT_ATTEMPTS
# The connection _might_ break during read_nonblock, in which case we really need
# to reconnect and restart but from the place where we left off.
LOGGER.warn { "Upstream hung up or intermittently refused (#{ex.class}), restarting from where it went away (after #{bytes_read} bytes)" }
byte_range = (byte_range.begin + bytes_read)..byte_range.end
# If we do not close the client the entire reactor will be blocked for 1 second during `client.call`
close_all(body, client, response)
retry
ensure
close_all(body, client, response)
end
def close_all(*args)
args.each do |arg|
arg.close if arg && arg.respond_to?(:close)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment