I have this abstraction in my application code called a "CloudFile". This is where I store in the database information about
files on S3 and it gives me a resource for other resources to own. For example, a user would have an avatar_cloud_file_id
.
On the front-end, I would load this relationship and display the avatar with user.avatar_cloud_file.download_url
defmodule RL.CloudFile do
use Ecto.Schema
import Ecto.Changeset
@timestamps_opts type: :utc_datetime_usec
schema "cloud_files" do
field :content_type, :string
field :content_hash, :string
field :size, :string
field :filename, :string
field :external_url, :string
field :height, :integer
field :is_uploaded, :boolean, default: false
field :token, :string
field :width, :integer
timestamps()
end
@doc false
def changeset(cloud_file, params) do
cloud_file
|> cast(params, __schema__(:fields) -- [:id])
|> Utils.Changeset.set_default(:token, :uuid)
|> validate_required([
:token,
:filename,
:content_type
])
|> unique_constraint(:token)
end
def create(params) do
struct(RL.CloudFile)
|> RL.CloudFile.changeset(params)
|> RL.Repo.insert()
end
def storage_bucket, do: "#{Util.config(:prefix)}-cloud-files"
def storage_key(cloud_file) do
cloud_file.token
end
def clear_from_storage(cloud_file) do
# not the end of the world if this fails. Just an orphaned file on s3
ExAws.S3.delete_object(
storage_bucket(),
storage_key(cloud_file)
)
|> ExAws.request()
cloud_file
|> changeset(%{
"is_uploaded" => false,
"external_url" => nil
})
|> RL.Repo.update!()
end
end
Then in the view for the CloudFile I provide the urls for the font-end to download it and a signed upload url to create/replace it:
defmodule RLWeb.CloudFileView do
use RLWeb, :view
use JaSerializer.PhoenixView
attributes([
:content_type,
:filename,
:download_url,
:upload_url,
:height,
:width,
:token,
:is_uploaded,
:inserted_at,
:updated_at
])
def download_url(cloud_file) do
cond do
cloud_file.is_uploaded ->
cf_domain = Util.config(:prefix_params)["CLOUD_FILE_CLOUDFRONT_DOMAIN"]
signing_params =
RL.CloudFiles.Signing.signed_url_params(
"https://#{cf_domain}/#{RL.CloudFile.storage_key(cloud_file)}"
)
%URI{
scheme: "https",
host: cf_domain,
path: "/#{RL.CloudFile.storage_key(cloud_file)}",
query:
signing_params
# |> Map.merge(%{
# "response-content-type" => "application/pdf",
# "response-content-disposition" => ~s(inline; filename="yourbutt.pdf"),
# "response-cache-control" => "max-age=2592000"
# })
|> URI.encode_query()
}
|> URI.to_string()
cloud_file.external_url ->
cloud_file.external_url
true ->
nil
end
end
def upload_url(cloud_file) do
bucket = RL.CloudFile.storage_bucket()
object_name = RL.CloudFile.storage_key(cloud_file)
query_params = [
"x-amz-acl": "private"
]
{:ok, url} =
:s3
|> ExAws.Config.new(Application.get_all_env(:ex_aws))
|> ExAws.S3.presigned_url(
:put,
bucket,
object_name,
virtual_host: true,
query_params: query_params
)
url
end
def is_uploaded(cloud_file) do
cloud_file.is_uploaded || false
end
end
Here's the terraform configuration to create everything you need on S3:
resource "aws_s3_bucket" "cloud_file_bucket" {
bucket = "${var.prefix}-cloud-files"
acl = "private"
cors_rule {
allowed_headers = ["*"]
allowed_methods = ["PUT", "POST"]
allowed_origins = ["*"]
expose_headers = ["ETag"]
max_age_seconds = 3000
}
}
resource "aws_ssm_parameter" "cloud_file_bucket_name" {
name = "/${var.prefix}/CLOUD_FILE_BUCKET"
type = "String"
value = aws_s3_bucket.cloud_file_bucket.id
overwrite = true
}
resource "aws_sqs_queue" "cloud_file" {
name = "${var.prefix}-cloud-file-queue"
redrive_policy = "{\"deadLetterTargetArn\":\"${aws_sqs_queue.deadletter.arn}\",\"maxReceiveCount\":4}"
policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:*:*:${var.prefix}-cloud-file-queue",
"Condition": {
"ArnEquals": { "aws:SourceArn": "${aws_s3_bucket.cloud_file_bucket.arn}" }
}
}
]
}
POLICY
}
resource "aws_s3_bucket_notification" "cloud_file_uploaded_notification" {
bucket = "${aws_s3_bucket.cloud_file_bucket.id}"
queue {
queue_arn = "${aws_sqs_queue.cloud_file.arn}"
events = ["s3:ObjectCreated:*"]
}
}
resource "aws_ssm_parameter" "cloud_file_sqs_queue_url" {
name = "/${var.prefix}/CLOUD_FILE_SQS_QUEUE_URL"
type = "String"
value = "${aws_sqs_queue.cloud_file.id}"
overwrite = true
}
resource "aws_cloudfront_distribution" "cloud_file_distribution" {
origin {
domain_name = "${aws_s3_bucket.cloud_file_bucket.bucket_regional_domain_name}"
origin_id = "S3-${aws_s3_bucket.cloud_file_bucket.id}"
s3_origin_config {
origin_access_identity = "${aws_cloudfront_origin_access_identity.web_origin_access_identity.cloudfront_access_identity_path}"
}
}
enabled = true
is_ipv6_enabled = true
price_class = "PriceClass_200"
wait_for_deployment = false
default_cache_behavior {
viewer_protocol_policy = "allow-all"
allowed_methods = ["GET", "HEAD"]
cached_methods = ["GET", "HEAD"]
target_origin_id = "S3-${aws_s3_bucket.cloud_file_bucket.id}"
trusted_signers = ["self"]
forwarded_values {
query_string = true
cookies {
forward = "none"
}
}
min_ttl = 0
default_ttl = 3600
max_ttl = 86400
}
viewer_certificate {
cloudfront_default_certificate = true
}
restrictions {
geo_restriction {
restriction_type = "none"
}
}
}
resource "aws_ssm_parameter" "cloud_file_distribution_domain" {
name = "/${var.prefix}/CLOUD_FILE_CLOUDFRONT_DOMAIN"
type = "String"
value = "${aws_cloudfront_distribution.cloud_file_distribution.domain_name}"
overwrite = true
}
data "aws_iam_policy_document" "cloud_file_s3_policy" {
statement {
actions = ["s3:GetObject"]
resources = ["${aws_s3_bucket.cloud_file_bucket.arn}/*"]
principals {
type = "AWS"
identifiers = ["${aws_cloudfront_origin_access_identity.web_origin_access_identity.iam_arn}"]
}
}
statement {
actions = ["s3:ListBucket"]
resources = ["${aws_s3_bucket.cloud_file_bucket.arn}"]
principals {
type = "AWS"
identifiers = ["${aws_cloudfront_origin_access_identity.web_origin_access_identity.iam_arn}"]
}
}
}
resource "aws_s3_bucket_policy" "cloud_file_bucket" {
bucket = "${aws_s3_bucket.cloud_file_bucket.id}"
policy = "${data.aws_iam_policy_document.cloud_file_s3_policy.json}"
}
Then, you create an Elixir Broadway consumer to consume from an SQS Queue that tells you when a cloud file has been successfully
defmodule RL.CloudFiles.QueueWorker do
use Broadway
alias Broadway.Message
@url Util.config(:prefix_params)["CLOUD_FILE_SQS_QUEUE_URL"]
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
module: {
if(Util.is_test?, do: Broadway.DummyProducer, else: BroadwaySQS.Producer),
queue_url: @url,
config: [
access_key_id: Application.fetch_env!(:ex_aws, :access_key_id),
secret_access_key: Application.fetch_env!(:ex_aws, :secret_access_key)
]
}
]
],
processors: [
default: []
],
batchers: [
default: [
batch_size: 10
]
]
)
end
def handle_message(_, message, _) do
message
end
def handle_batch(_, messages, _, _) do
messages
|> Enum.each(fn %Message{data: data} ->
data
|> Jason.decode()
|> case do
{:ok, %{"Records" => records}} ->
records
|> Enum.each(fn
%{"s3" => %{"object" => %{"key" => key} = attrs}} ->
token = Path.basename(key)
RL.CloudFile
|> RL.Repo.get_by(token: token)
|> case do
nil ->
nil
cf ->
cf
|> RL.CloudFile.changeset(%{
"is_uploaded" => true,
"size" => if(attrs["size"], do: "#{attrs["size"]}"),
"content_hash" => attrs["etag"] || attrs["eTag"]
})
|> RL.Repo.update!()
end
_ ->
nil
end)
end
end)
messages
end
end
And a test for it:
defmodule RL.CloudFiles.QueueWorkerTest do
use RL.DataCase
test "process a cloud file message" do
cloud_file = create(:cloud_file, is_uploaded: false)
data = "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-west-2\",\"eventTime\":\"2019-11-01T13:50:09.728Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"A1LUDJVGANV5IQ\"},\"requestParameters\":{\"sourceIPAddress\":\"75.169.185.0\"},\"responseElements\":{\"x-amz-request-id\":\"C747FA4C0F0E99AD\",\"x-amz-id-2\":\"KxlREb9pETqV8o/P0Lr4VKWz14HgwnPf6uyyHoidAJjcs/LJte+4u7DmV/zKIyPiSTRxTHBhV0k=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"tf-s3-queue-20191030135243399600000001\",\"bucket\":{\"name\":\"dev-cloud-files\",\"ownerIdentity\":{\"principalId\":\"A1LUDJVGANV5IQ\"},\"arn\":\"arn:aws:s3:::dev-cloud-files\"},\"object\":{\"key\":\"#{cloud_file.token}\",\"size\":430019,\"eTag\":\"c3043e9b33ca17a2bc0c5a22305e983d\",\"sequencer\":\"005DBC38060A6FBB13\"}}}]}"
ref = Broadway.test_messages(RL.CloudFiles.QueueWorker, [data])
assert_receive {:ack, ^ref, successful, failed}
assert length(successful) == 1
assert length(failed) == 0
assert RL.Repo.get(RL.CloudFile, cloud_file.id).is_uploaded
end
end
If you want to create a lambda function that automatically creates thumbnails, you can do:
resource "aws_sns_topic" "cloud_file_created_topic" {
name = "${var.prefix}-cloud-file-created"
policy = <<POLICY
{
"Version":"2012-10-17",
"Statement":[{
"Effect": "Allow",
"Principal": {"AWS":"*"},
"Action": "SNS:Publish",
"Resource": "arn:aws:sns:*:*:${var.prefix}-page-created",
"Condition":{
"ArnLike":{"aws:SourceArn":"${aws_s3_bucket.dvp_cloud_file_bucket.arn}"}
}
}]
}
POLICY
}
resource "aws_s3_bucket_notification" "cloud_file_created_notification" {
bucket = "${aws_s3_bucket.dvp_cloud_file_bucket.id}"
topic {
topic_arn = "${aws_sns_topic.cloud_file_created_topic.arn}"
events = ["s3:ObjectCreated:*"]
}
}
resource "aws_ssm_parameter" "cloud_file_created_topic_parameter" {
name = "/${var.prefix}/CLOUD_FILE_CREATED_TOPIC_ARN"
type = "String"
value = "${aws_sns_topic.cloud_file_created_topic.arn}"
overwrite = true
}
Then with a serverless configuration:
service: dvp
provider:
name: aws
runtime: nodejs8.10
profile: whatever
region: us-west-2
stage: ${opt:stage, 'dev'}
environment:
STAGE: ${self:provider.stage}
iamRoleStatements:
- Effect: Allow
Action:
- s3:GetObject
- s3:GetObjectAcl
- s3:PutObject
- s3:PutObjectAcl
Resource:
- 'arn:aws:s3:::${self:provider.stage}-cloud-files/*'
- Effect: Allow
Action:
- lambda:InvokeFunction
Resource: '*'
- Effect: Allow
Action:
- sns:publish
Resource: '*'
functions:
createThumbnail:
name: ${self:provider.stage}-create-thumbnail
handler: create-thumbnail.createThumbnail
timeout: 30
memorySize: 3008
events:
- sns: ${ssm:/${self:provider.stage}/CLOUD_FILE_CREATED_TOPIC_ARN}
Then the cloud function be like:
const exec = require('child_process').exec
const AWS = require('aws-sdk')
const fs = require('fs')
const path = require('path')
const rimraf = require('rimraf')
AWS.config.update({ region: 'us-west-2' })
const s3 = new AWS.S3()
const destBucket = `${process.env.STAGE}-cloud-file-thumbnails`
exports.createThumbnail = (event, cxt, callback) => {
// const context = event.context
const rand = Math.random()
.toString(36)
.substring(2)
const tmpFile = `/tmp/original-file-${rand}.pdf`
const outDir = `/tmp/thumbnails-${rand}`
event = JSON.parse(event.Records[0].Sns.Message)
const bucket = event.Records[0].s3.bucket.name
const key = event.Records[0].s3.object.key
const match = key.match(/^(.+?)\/(.+?)\.pdf$/)
const context = {
tmpFile: tmpFile,
outDir: outDir,
bucket: bucket,
key: key,
token: match[1],
page: match[2],
dim: '200',
callback: callback,
}
download(context)
}
function download(context) {
const params = {
Bucket: context.bucket,
Key: context.key,
}
const file = fs.createWriteStream(context.tmpFile)
file.on('close', function() {
console.log(`Image ${context.key} has been downloaded to ${context.tmpFile}.`)
createThumbnail(context)
})
s3.getObject(params)
.createReadStream()
.on('error', function(err) {
console.log(err)
return context.callback('could not download from s3', null)
})
.pipe(file)
}
function createThumbnail(context) {
// prepare directory
if (!fs.existsSync(context.outDir)) {
fs.mkdirSync(context.outDir)
}
exec(`gs -sDEVICE=png16 -o /tmp/image.png -r400 ${context.tmpFile}`, (err, stdout, stderr) => {
console.error(err, stdout, stderr, 'gs -sDEVICE')
exec(
`convert -thumbnail ${context.dim}x${context.dim} -background white -alpha remove -alpha off /tmp/image.png ${context.outDir}/thumb.png`,
(err, stdout, stderr) => {
console.log(stdout)
if (err) {
console.error('Failed to tile pdf.', err)
console.log(stderr)
return context.callback('failed to createThumbnail pdf', null)
} else {
console.log('Successfully tile pdf')
upload(context)
}
}
)
})
}
function upload(context) {
console.log(`Reading ${context.outDir} for individual pdf files`)
let destKey = `${context.token}/${context.page}.png`
fs.readFile(`${context.outDir}/thumb.png`, (err, data) => {
if (err) {
console.error('Failed to read image.', err)
reject(err)
} else {
var base64data = new Buffer(data, 'binary')
console.log(`Uploading thumbnail to ${destKey}`)
s3.putObject(
{
Bucket: destBucket,
Key: destKey,
Body: base64data,
ACL: 'public-read',
},
resp => {
cleanUp(context)
console.log('Successfully uploaded thumbnail image for page.')
}
)
}
})
}
function cleanUp(context) {
console.log(`Deleting files`)
// Delete the temporary file.
fs.unlink(context.tmpFile, err => {
if (err) {
console.error(`Failed to delete ${context.tmpFile}`, err)
return context.callback('failed to delete', null)
} else {
rimraf(context.outDir, err => {
if (err) {
console.error(`Failed to delete ${context.outDir}`, err)
context.callback(1)
return context.callback('failed to delete', null)
} else {
console.log(`Success!`)
context.callback(null, 'success')
}
})
}
})
}