Skip to content

Instantly share code, notes, and snippets.

@reagent
Last active November 22, 2024 18:56
Show Gist options
  • Save reagent/5c4f039f35c5240dc9053d3dbf3a71d2 to your computer and use it in GitHub Desktop.
Save reagent/5c4f039f35c5240dc9053d3dbf3a71d2 to your computer and use it in GitHub Desktop.
Streaming file uploads to S3 buckets
AWS_BUCKET_NAME=

Streaming Uploads to S3 with Rack

Inspired by this post, I wanted to see if it was possible to stream a file directly to S3 without having to cache the file contents on the server before uploading to S3. This could be added to a Rails app using the built-in routing capabilities.

Setup

This application uses the AWS S3 SDK for the final upload, so it's recommended to configure aws-vault to act as a wrapper to run the application to ensure that the right environment variables are available.

To run the application, create a .env file and provide a bucket that you have upload access to, then start the application:

cp -n .env{.example,}
bundle
aws-vault exec <profile> -- bundle exec rackup

Usage

With the application running, you can upload a file from your local machine with cURL:

curl \
  -H "content-type: application/octet-stream" \
  --data-binary @config.ru \
  http://localhost:9292/stream/resources/1/file

You can trigger other implementations by changing the URL:

curl \
  -H "content-type: application/octet-stream" \
  --data-binary @config.ru \
  http://localhost:9292/multipart/resources/1/file
curl \
  -H "content-type: application/octet-stream" \
  --data-binary @config.ru \
  http://localhost:9292/multipart_stream/resources/1/file

The ID provided as part of the URI will become part of the object key that is uploaded to S3. To upload to other locations, you can change the ID in the URL.

require "bundler/setup"
require "debug"
require "base64"
require "aws-sdk-s3"
require "dotenv/load"
BUFFER_SIZE = 5 * 1024 * 1024 # 5 megabytes
# time curl -s -H "content-type: application/octet-stream" --data-binary @tmp/input.20mb --url http://localhost:9292/stream/resources/1/file
# 0.01s user 0.02s system 0% cpu 8.835 total
#
# time curl -s -H "content-type: application/octet-stream" --data-binary @tmp/input.20mb --url http://localhost:9292/multipart/resources/1/file
# 0.01s user 0.01s system 0% cpu 9.693 total
#
# time curl -s -H "content-type: application/octet-stream" --data-binary @tmp/input.20mb --url http://localhost:9292/multipart_stream/resources/1/file
# 0.01s user 0.02s system 0% cpu 8.690 total
class StreamingUploader
def call(env)
# http://localhost:9292/stream/resources/:id/file
# http://localhost:9292/multipart/resources/:id/file
# http://localhost:9292/multipart_stream/resources/:id/file
#
url_pattern = %r{^/(?<strategy>\w+)/resources/(?<id>\d+)/file$}
if env["REQUEST_METHOD"] != "POST"
return [404, {"content-type" => "text/plain"}, ["Invalid request method"]]
end
if !(matches = url_pattern.match(env["REQUEST_PATH"]))
return [404, {"content-type" => "text/plain"}, ["Not Found"]]
end
id, strategy = matches.values_at("id", "strategy")
unless %w[stream multipart multipart_stream].include?(strategy)
return [400, {"content-type" => "text/plain"}, ["Invalid upload strategy"]]
end
bucket_name = ENV.fetch("AWS_BUCKET_NAME")
key_name = "resources_#{id}_file"
case strategy
when "stream"
bucket = Aws::S3::Bucket.new(bucket_name)
object = bucket.object(key_name)
object.upload_stream(part_size: BUFFER_SIZE) do |out|
while (chunk = env["rack.input"].read(BUFFER_SIZE))
out << chunk
end
end
[200, {"content-type" => "text/plain"}, ["OK"]]
when "multipart"
client = Aws::S3::Client.new
upload = client.create_multipart_upload({bucket: bucket_name, key: key_name})
parts = []
part_number = 1
while (chunk = env["rack.input"].read(BUFFER_SIZE))
parts << client.upload_part({
part_number:,
bucket: bucket_name,
key: key_name,
upload_id: upload.upload_id,
body: chunk
})
part_number += 1
end
completed = client.complete_multipart_upload({
bucket: bucket_name,
key: key_name,
upload_id: upload.upload_id,
multipart_upload: {
parts: parts.map.with_index { |p, i| {etag: p.etag, part_number: i + 1} }
}
})
response_body = JSON.generate({
url: completed.location,
bucket: completed.bucket,
key: completed.key,
version_id: completed.version_id
})
[200, {"content-type" => "application/json"}, [response_body]]
when "multipart_stream"
client = Aws::S3::Client.new
options = {part_size: BUFFER_SIZE}
uploader = Aws::S3::MultipartStreamUploader.new({client:}.merge(options))
completed = uploader.upload(options.merge(bucket: bucket_name, key: key_name)) do |out|
while (chunk = env["rack.input"].read(BUFFER_SIZE))
out << chunk
end
end
response_body = JSON.generate({
url: completed.location,
bucket: completed.bucket,
key: completed.key,
version_id: completed.version_id
})
[200, {"content-type" => "application/json"}, [response_body]]
else
[500, {"content-type" => "text/plain"}, "This should not happen, question your life choices"]
end
end
end
app = StreamingUploader.new
run app
source "https://rubygems.org"
gem "aws-sdk-s3", "~> 1.173"
gem "base64", "~> 0.2.0"
gem "debug", platforms: %i[mri mingw x64_mingw]
gem "dotenv", "~> 3.1"
gem "nokogiri", "~> 1.16"
gem "puma", "~> 6.4"
gem "rack", "~> 3.1"
gem "rackup", "~> 2.2"
gem "standard", "~> 1.42"
GEM
remote: https://rubygems.org/
specs:
ast (2.4.2)
aws-eventstream (1.3.0)
aws-partitions (1.1012.0)
aws-sdk-core (3.213.0)
aws-eventstream (~> 1, >= 1.3.0)
aws-partitions (~> 1, >= 1.992.0)
aws-sigv4 (~> 1.9)
jmespath (~> 1, >= 1.6.1)
aws-sdk-kms (1.96.0)
aws-sdk-core (~> 3, >= 3.210.0)
aws-sigv4 (~> 1.5)
aws-sdk-s3 (1.173.0)
aws-sdk-core (~> 3, >= 3.210.0)
aws-sdk-kms (~> 1)
aws-sigv4 (~> 1.5)
aws-sigv4 (1.10.1)
aws-eventstream (~> 1, >= 1.0.2)
base64 (0.2.0)
debug (1.9.2)
irb (~> 1.10)
reline (>= 0.3.8)
dotenv (3.1.4)
io-console (0.7.2)
irb (1.14.1)
rdoc (>= 4.0.0)
reline (>= 0.4.2)
jmespath (1.6.2)
json (2.8.2)
language_server-protocol (3.17.0.3)
lint_roller (1.1.0)
mini_portile2 (2.8.8)
nio4r (2.7.4)
nokogiri (1.16.7)
mini_portile2 (~> 2.8.2)
racc (~> 1.4)
nokogiri (1.16.7-arm64-darwin)
racc (~> 1.4)
parallel (1.26.3)
parser (3.3.6.0)
ast (~> 2.4.1)
racc
psych (5.2.0)
stringio
puma (6.4.3)
nio4r (~> 2.0)
racc (1.8.1)
rack (3.1.8)
rackup (2.2.1)
rack (>= 3)
rainbow (3.1.1)
rdoc (6.8.1)
psych (>= 4.0.0)
regexp_parser (2.9.2)
reline (0.5.11)
io-console (~> 0.5)
rubocop (1.68.0)
json (~> 2.3)
language_server-protocol (>= 3.17.0)
parallel (~> 1.10)
parser (>= 3.3.0.2)
rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 2.4, < 3.0)
rubocop-ast (>= 1.32.2, < 2.0)
ruby-progressbar (~> 1.7)
unicode-display_width (>= 2.4.0, < 3.0)
rubocop-ast (1.36.1)
parser (>= 3.3.1.0)
rubocop-performance (1.22.1)
rubocop (>= 1.48.1, < 2.0)
rubocop-ast (>= 1.31.1, < 2.0)
ruby-progressbar (1.13.0)
standard (1.42.1)
language_server-protocol (~> 3.17.0.2)
lint_roller (~> 1.0)
rubocop (~> 1.68.0)
standard-custom (~> 1.0.0)
standard-performance (~> 1.5)
standard-custom (1.0.2)
lint_roller (~> 1.0)
rubocop (~> 1.50)
standard-performance (1.5.0)
lint_roller (~> 1.1)
rubocop-performance (~> 1.22.0)
stringio (3.1.2)
unicode-display_width (2.6.0)
PLATFORMS
arm64-darwin-23
ruby
DEPENDENCIES
aws-sdk-s3 (~> 1.173)
base64 (~> 0.2.0)
debug
dotenv (~> 3.1)
nokogiri (~> 1.16)
puma (~> 6.4)
rack (~> 3.1)
rackup (~> 2.2)
standard (~> 1.42)
BUNDLED WITH
2.5.3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment