|
require "bundler/setup" |
|
require "debug" |
|
require "base64" |
|
require "aws-sdk-s3" |
|
require "dotenv/load" |
|
|
|
BUFFER_SIZE = 5 * 1024 * 1024 # 5 megabytes |
|
|
|
# time curl -s -H "content-type: application/octet-stream" --data-binary @tmp/input.20mb --url http://localhost:9292/stream/resources/1/file |
|
# 0.01s user 0.02s system 0% cpu 8.835 total |
|
# |
|
# time curl -s -H "content-type: application/octet-stream" --data-binary @tmp/input.20mb --url http://localhost:9292/multipart/resources/1/file |
|
# 0.01s user 0.01s system 0% cpu 9.693 total |
|
# |
|
# time curl -s -H "content-type: application/octet-stream" --data-binary @tmp/input.20mb --url http://localhost:9292/multipart_stream/resources/1/file |
|
# 0.01s user 0.02s system 0% cpu 8.690 total |
|
|
|
class StreamingUploader |
|
def call(env) |
|
# http://localhost:9292/stream/resources/:id/file |
|
# http://localhost:9292/multipart/resources/:id/file |
|
# http://localhost:9292/multipart_stream/resources/:id/file |
|
# |
|
url_pattern = %r{^/(?<strategy>\w+)/resources/(?<id>\d+)/file$} |
|
|
|
if env["REQUEST_METHOD"] != "POST" |
|
return [404, {"content-type" => "text/plain"}, ["Invalid request method"]] |
|
end |
|
|
|
if !(matches = url_pattern.match(env["REQUEST_PATH"])) |
|
return [404, {"content-type" => "text/plain"}, ["Not Found"]] |
|
end |
|
|
|
id, strategy = matches.values_at("id", "strategy") |
|
|
|
unless %w[stream multipart multipart_stream].include?(strategy) |
|
return [400, {"content-type" => "text/plain"}, ["Invalid upload strategy"]] |
|
end |
|
|
|
bucket_name = ENV.fetch("AWS_BUCKET_NAME") |
|
key_name = "resources_#{id}_file" |
|
|
|
case strategy |
|
when "stream" |
|
bucket = Aws::S3::Bucket.new(bucket_name) |
|
object = bucket.object(key_name) |
|
|
|
object.upload_stream(part_size: BUFFER_SIZE) do |out| |
|
while (chunk = env["rack.input"].read(BUFFER_SIZE)) |
|
out << chunk |
|
end |
|
end |
|
|
|
[200, {"content-type" => "text/plain"}, ["OK"]] |
|
when "multipart" |
|
client = Aws::S3::Client.new |
|
upload = client.create_multipart_upload({bucket: bucket_name, key: key_name}) |
|
parts = [] |
|
part_number = 1 |
|
|
|
while (chunk = env["rack.input"].read(BUFFER_SIZE)) |
|
parts << client.upload_part({ |
|
part_number:, |
|
bucket: bucket_name, |
|
key: key_name, |
|
upload_id: upload.upload_id, |
|
body: chunk |
|
}) |
|
|
|
part_number += 1 |
|
end |
|
|
|
completed = client.complete_multipart_upload({ |
|
bucket: bucket_name, |
|
key: key_name, |
|
upload_id: upload.upload_id, |
|
multipart_upload: { |
|
parts: parts.map.with_index { |p, i| {etag: p.etag, part_number: i + 1} } |
|
} |
|
}) |
|
|
|
response_body = JSON.generate({ |
|
url: completed.location, |
|
bucket: completed.bucket, |
|
key: completed.key, |
|
version_id: completed.version_id |
|
}) |
|
|
|
[200, {"content-type" => "application/json"}, [response_body]] |
|
when "multipart_stream" |
|
client = Aws::S3::Client.new |
|
options = {part_size: BUFFER_SIZE} |
|
|
|
uploader = Aws::S3::MultipartStreamUploader.new({client:}.merge(options)) |
|
|
|
completed = uploader.upload(options.merge(bucket: bucket_name, key: key_name)) do |out| |
|
while (chunk = env["rack.input"].read(BUFFER_SIZE)) |
|
out << chunk |
|
end |
|
end |
|
|
|
response_body = JSON.generate({ |
|
url: completed.location, |
|
bucket: completed.bucket, |
|
key: completed.key, |
|
version_id: completed.version_id |
|
}) |
|
|
|
[200, {"content-type" => "application/json"}, [response_body]] |
|
else |
|
[500, {"content-type" => "text/plain"}, "This should not happen, question your life choices"] |
|
end |
|
end |
|
end |
|
|
|
app = StreamingUploader.new |
|
run app |