Skip to content

Instantly share code, notes, and snippets.

@atomkirk
Last active Oct 12, 2022
Embed
What would you like to do?
Storing files on S3 with Elixir

I have this abstraction in my application code called a "CloudFile". This is where I store in the database information about files on S3 and it gives me a resource for other resources to own. For example, a user would have an avatar_cloud_file_id. On the front-end, I would load this relationship and display the avatar with user.avatar_cloud_file.download_url

defmodule RL.CloudFile do
  use Ecto.Schema
  import Ecto.Changeset

  @timestamps_opts type: :utc_datetime_usec

  schema "cloud_files" do
    field :content_type, :string
    field :content_hash, :string
    field :size, :string
    field :filename, :string
    field :external_url, :string
    field :height, :integer
    field :is_uploaded, :boolean, default: false
    field :token, :string
    field :width, :integer

    timestamps()
  end

  @doc false
  def changeset(cloud_file, params) do
    cloud_file
    |> cast(params, __schema__(:fields) -- [:id])
    |> Utils.Changeset.set_default(:token, :uuid)
    |> validate_required([
      :token,
      :filename,
      :content_type
    ])
    |> unique_constraint(:token)
  end

  def create(params) do
    struct(RL.CloudFile)
    |> RL.CloudFile.changeset(params)
    |> RL.Repo.insert()
  end

  def storage_bucket, do: "#{Util.config(:prefix)}-cloud-files"

  def storage_key(cloud_file) do
    cloud_file.token
  end

  def clear_from_storage(cloud_file) do
    # not the end of the world if this fails. Just an orphaned file on s3
    ExAws.S3.delete_object(
      storage_bucket(),
      storage_key(cloud_file)
    )
    |> ExAws.request()

    cloud_file
    |> changeset(%{
      "is_uploaded" => false,
      "external_url" => nil
    })
    |> RL.Repo.update!()
  end
end

Then in the view for the CloudFile I provide the urls for the font-end to download it and a signed upload url to create/replace it:

defmodule RLWeb.CloudFileView do
  use RLWeb, :view
  use JaSerializer.PhoenixView

  attributes([
    :content_type,
    :filename,
    :download_url,
    :upload_url,
    :height,
    :width,
    :token,
    :is_uploaded,
    :inserted_at,
    :updated_at
  ])

  def download_url(cloud_file) do
    cond do
      cloud_file.is_uploaded ->
        cf_domain = Util.config(:prefix_params)["CLOUD_FILE_CLOUDFRONT_DOMAIN"]

        signing_params =
          RL.CloudFiles.Signing.signed_url_params(
            "https://#{cf_domain}/#{RL.CloudFile.storage_key(cloud_file)}"
          )

        %URI{
          scheme: "https",
          host: cf_domain,
          path: "/#{RL.CloudFile.storage_key(cloud_file)}",
          query:
            signing_params
            # |> Map.merge(%{
            #   "response-content-type" => "application/pdf",
            #   "response-content-disposition" => ~s(inline; filename="yourbutt.pdf"),
            #   "response-cache-control" => "max-age=2592000"
            # })
            |> URI.encode_query()
        }
        |> URI.to_string()

      cloud_file.external_url ->
        cloud_file.external_url

      true ->
        nil
    end
  end

  def upload_url(cloud_file) do
    bucket = RL.CloudFile.storage_bucket()
    object_name = RL.CloudFile.storage_key(cloud_file)

    query_params = [
      "x-amz-acl": "private"
    ]

    {:ok, url} =
      :s3
      |> ExAws.Config.new(Application.get_all_env(:ex_aws))
      |> ExAws.S3.presigned_url(
        :put,
        bucket,
        object_name,
        virtual_host: true,
        query_params: query_params
      )
    url
  end

  def is_uploaded(cloud_file) do
    cloud_file.is_uploaded || false
  end
end

Here's the terraform configuration to create everything you need on S3:

resource "aws_s3_bucket" "cloud_file_bucket" {
  bucket = "${var.prefix}-cloud-files"
  acl    = "private"

  cors_rule {
    allowed_headers = ["*"]
    allowed_methods = ["PUT", "POST"]
    allowed_origins = ["*"]
    expose_headers  = ["ETag"]
    max_age_seconds = 3000
  }
}

resource "aws_ssm_parameter" "cloud_file_bucket_name" {
  name      = "/${var.prefix}/CLOUD_FILE_BUCKET"
  type      = "String"
  value     = aws_s3_bucket.cloud_file_bucket.id
  overwrite = true
}

resource "aws_sqs_queue" "cloud_file" {
  name           = "${var.prefix}-cloud-file-queue"
  redrive_policy = "{\"deadLetterTargetArn\":\"${aws_sqs_queue.deadletter.arn}\",\"maxReceiveCount\":4}"

  policy = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": "*",
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:*:*:${var.prefix}-cloud-file-queue",
      "Condition": {
        "ArnEquals": { "aws:SourceArn": "${aws_s3_bucket.cloud_file_bucket.arn}" }
      }
    }
  ]
}
POLICY
}

resource "aws_s3_bucket_notification" "cloud_file_uploaded_notification" {
  bucket = "${aws_s3_bucket.cloud_file_bucket.id}"

  queue {
    queue_arn = "${aws_sqs_queue.cloud_file.arn}"
    events    = ["s3:ObjectCreated:*"]
  }
}

resource "aws_ssm_parameter" "cloud_file_sqs_queue_url" {
  name      = "/${var.prefix}/CLOUD_FILE_SQS_QUEUE_URL"
  type      = "String"
  value     = "${aws_sqs_queue.cloud_file.id}"
  overwrite = true
}

resource "aws_cloudfront_distribution" "cloud_file_distribution" {
  origin {
    domain_name = "${aws_s3_bucket.cloud_file_bucket.bucket_regional_domain_name}"
    origin_id = "S3-${aws_s3_bucket.cloud_file_bucket.id}"
    s3_origin_config {
      origin_access_identity = "${aws_cloudfront_origin_access_identity.web_origin_access_identity.cloudfront_access_identity_path}"
    }
  }

  enabled = true
  is_ipv6_enabled = true
  price_class = "PriceClass_200"
  wait_for_deployment = false

  default_cache_behavior {
    viewer_protocol_policy = "allow-all"
    allowed_methods = ["GET", "HEAD"]
    cached_methods = ["GET", "HEAD"]
    target_origin_id = "S3-${aws_s3_bucket.cloud_file_bucket.id}"
    trusted_signers = ["self"]

    forwarded_values {
      query_string = true

      cookies {
        forward = "none"
      }
    }

    min_ttl = 0
    default_ttl = 3600
    max_ttl = 86400
  }

  viewer_certificate {
    cloudfront_default_certificate = true
  }

  restrictions {
    geo_restriction {
      restriction_type = "none"
    }
  }
}

resource "aws_ssm_parameter" "cloud_file_distribution_domain" {
  name = "/${var.prefix}/CLOUD_FILE_CLOUDFRONT_DOMAIN"
  type = "String"
  value = "${aws_cloudfront_distribution.cloud_file_distribution.domain_name}"
  overwrite = true
}

data "aws_iam_policy_document" "cloud_file_s3_policy" {
  statement {
    actions = ["s3:GetObject"]
    resources = ["${aws_s3_bucket.cloud_file_bucket.arn}/*"]

    principals {
      type = "AWS"
      identifiers = ["${aws_cloudfront_origin_access_identity.web_origin_access_identity.iam_arn}"]
    }
  }

  statement {
    actions = ["s3:ListBucket"]
    resources = ["${aws_s3_bucket.cloud_file_bucket.arn}"]

    principals {
      type = "AWS"
      identifiers = ["${aws_cloudfront_origin_access_identity.web_origin_access_identity.iam_arn}"]
    }
  }
}

resource "aws_s3_bucket_policy" "cloud_file_bucket" {
  bucket = "${aws_s3_bucket.cloud_file_bucket.id}"
  policy = "${data.aws_iam_policy_document.cloud_file_s3_policy.json}"
}

Then, you create an Elixir Broadway consumer to consume from an SQS Queue that tells you when a cloud file has been successfully

defmodule RL.CloudFiles.QueueWorker do
  use Broadway

  alias Broadway.Message

  @url Util.config(:prefix_params)["CLOUD_FILE_SQS_QUEUE_URL"]

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producers: [
        default: [
          module: {
            if(Util.is_test?, do: Broadway.DummyProducer, else: BroadwaySQS.Producer),
            queue_url: @url,
            config: [
              access_key_id: Application.fetch_env!(:ex_aws, :access_key_id),
              secret_access_key: Application.fetch_env!(:ex_aws, :secret_access_key)
            ]
          }
        ]
      ],
      processors: [
        default: []
      ],
      batchers: [
        default: [
          batch_size: 10
        ]
      ]
    )
  end
  
  def handle_message(_, message, _) do
    message
  end

  def handle_batch(_, messages, _, _) do
    messages
    |> Enum.each(fn %Message{data: data} ->
      data
      |> Jason.decode()
      |> case do
        {:ok, %{"Records" => records}} ->
          records
          |> Enum.each(fn
            %{"s3" => %{"object" => %{"key" => key} = attrs}} ->
              token = Path.basename(key)

              RL.CloudFile
              |> RL.Repo.get_by(token: token)
              |> case do
                nil ->
                  nil

                cf ->
                  cf
                  |> RL.CloudFile.changeset(%{
                    "is_uploaded" => true,
                    "size" => if(attrs["size"], do: "#{attrs["size"]}"),
                    "content_hash" => attrs["etag"] || attrs["eTag"]
                  })
                  |> RL.Repo.update!()
              end

            _ ->
              nil
          end)
      end
    end)

    messages
  end
end

And a test for it:

defmodule RL.CloudFiles.QueueWorkerTest do
  use RL.DataCase

  test "process a cloud file message" do
    cloud_file = create(:cloud_file, is_uploaded: false)
    data = "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-west-2\",\"eventTime\":\"2019-11-01T13:50:09.728Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"A1LUDJVGANV5IQ\"},\"requestParameters\":{\"sourceIPAddress\":\"75.169.185.0\"},\"responseElements\":{\"x-amz-request-id\":\"C747FA4C0F0E99AD\",\"x-amz-id-2\":\"KxlREb9pETqV8o/P0Lr4VKWz14HgwnPf6uyyHoidAJjcs/LJte+4u7DmV/zKIyPiSTRxTHBhV0k=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"tf-s3-queue-20191030135243399600000001\",\"bucket\":{\"name\":\"dev-cloud-files\",\"ownerIdentity\":{\"principalId\":\"A1LUDJVGANV5IQ\"},\"arn\":\"arn:aws:s3:::dev-cloud-files\"},\"object\":{\"key\":\"#{cloud_file.token}\",\"size\":430019,\"eTag\":\"c3043e9b33ca17a2bc0c5a22305e983d\",\"sequencer\":\"005DBC38060A6FBB13\"}}}]}"
    ref = Broadway.test_messages(RL.CloudFiles.QueueWorker, [data])
    assert_receive {:ack, ^ref, successful, failed}
    assert length(successful) == 1
    assert length(failed) == 0
    assert RL.Repo.get(RL.CloudFile, cloud_file.id).is_uploaded
  end
end

Lambda function to create thumbnails

If you want to create a lambda function that automatically creates thumbnails, you can do:

resource "aws_sns_topic" "cloud_file_created_topic" {
  name = "${var.prefix}-cloud-file-created"

  policy = <<POLICY
{
    "Version":"2012-10-17",
    "Statement":[{
        "Effect": "Allow",
        "Principal": {"AWS":"*"},
        "Action": "SNS:Publish",
        "Resource": "arn:aws:sns:*:*:${var.prefix}-page-created",
        "Condition":{
            "ArnLike":{"aws:SourceArn":"${aws_s3_bucket.dvp_cloud_file_bucket.arn}"}
        }
    }]
}
POLICY
}

resource "aws_s3_bucket_notification" "cloud_file_created_notification" {
  bucket = "${aws_s3_bucket.dvp_cloud_file_bucket.id}"

  topic {
    topic_arn = "${aws_sns_topic.cloud_file_created_topic.arn}"
    events = ["s3:ObjectCreated:*"]
  }
}

resource "aws_ssm_parameter" "cloud_file_created_topic_parameter" {
  name = "/${var.prefix}/CLOUD_FILE_CREATED_TOPIC_ARN"
  type = "String"
  value = "${aws_sns_topic.cloud_file_created_topic.arn}"
  overwrite = true
}

Then with a serverless configuration:

service: dvp

provider:
  name: aws
  runtime: nodejs8.10
  profile: whatever
  region: us-west-2
  stage: ${opt:stage, 'dev'}
  environment:
    STAGE: ${self:provider.stage}
  iamRoleStatements:
    - Effect: Allow
      Action:
        - s3:GetObject
        - s3:GetObjectAcl
        - s3:PutObject
        - s3:PutObjectAcl
      Resource:
        - 'arn:aws:s3:::${self:provider.stage}-cloud-files/*'
    - Effect: Allow
      Action:
        - lambda:InvokeFunction
      Resource: '*'
    - Effect: Allow
      Action:
        - sns:publish
      Resource: '*'

functions:
  createThumbnail:
    name: ${self:provider.stage}-create-thumbnail
    handler: create-thumbnail.createThumbnail
    timeout: 30
    memorySize: 3008
    events:
      - sns: ${ssm:/${self:provider.stage}/CLOUD_FILE_CREATED_TOPIC_ARN}

Then the cloud function be like:

const exec = require('child_process').exec
const AWS = require('aws-sdk')
const fs = require('fs')
const path = require('path')
const rimraf = require('rimraf')

AWS.config.update({ region: 'us-west-2' })
const s3 = new AWS.S3()

const destBucket = `${process.env.STAGE}-cloud-file-thumbnails`

exports.createThumbnail = (event, cxt, callback) => {
  // const context = event.context
  const rand = Math.random()
    .toString(36)
    .substring(2)
  const tmpFile = `/tmp/original-file-${rand}.pdf`
  const outDir = `/tmp/thumbnails-${rand}`

  event = JSON.parse(event.Records[0].Sns.Message)
  const bucket = event.Records[0].s3.bucket.name
  const key = event.Records[0].s3.object.key
  const match = key.match(/^(.+?)\/(.+?)\.pdf$/)

  const context = {
    tmpFile: tmpFile,
    outDir: outDir,
    bucket: bucket,
    key: key,
    token: match[1],
    page: match[2],
    dim: '200',
    callback: callback,
  }

  download(context)
}

function download(context) {
  const params = {
    Bucket: context.bucket,
    Key: context.key,
  }
  const file = fs.createWriteStream(context.tmpFile)
  file.on('close', function() {
    console.log(`Image ${context.key} has been downloaded to ${context.tmpFile}.`)
    createThumbnail(context)
  })
  s3.getObject(params)
    .createReadStream()
    .on('error', function(err) {
      console.log(err)
      return context.callback('could not download from s3', null)
    })
    .pipe(file)
}

function createThumbnail(context) {
  // prepare directory
  if (!fs.existsSync(context.outDir)) {
    fs.mkdirSync(context.outDir)
  }

  exec(`gs -sDEVICE=png16 -o /tmp/image.png -r400 ${context.tmpFile}`, (err, stdout, stderr) => {
    console.error(err, stdout, stderr, 'gs -sDEVICE')
    exec(
      `convert -thumbnail ${context.dim}x${context.dim} -background white -alpha remove -alpha off /tmp/image.png ${context.outDir}/thumb.png`,
      (err, stdout, stderr) => {
        console.log(stdout)
        if (err) {
          console.error('Failed to tile pdf.', err)
          console.log(stderr)
          return context.callback('failed to createThumbnail pdf', null)
        } else {
          console.log('Successfully tile pdf')
          upload(context)
        }
      }
    )
  })
}

function upload(context) {
  console.log(`Reading ${context.outDir} for individual pdf files`)

  let destKey = `${context.token}/${context.page}.png`

  fs.readFile(`${context.outDir}/thumb.png`, (err, data) => {
    if (err) {
      console.error('Failed to read image.', err)
      reject(err)
    } else {
      var base64data = new Buffer(data, 'binary')
      console.log(`Uploading thumbnail to ${destKey}`)
      s3.putObject(
        {
          Bucket: destBucket,
          Key: destKey,
          Body: base64data,
          ACL: 'public-read',
        },
        resp => {
          cleanUp(context)
          console.log('Successfully uploaded thumbnail image for page.')
        }
      )
    }
  })
}

function cleanUp(context) {
  console.log(`Deleting files`)

  // Delete the temporary file.
  fs.unlink(context.tmpFile, err => {
    if (err) {
      console.error(`Failed to delete ${context.tmpFile}`, err)
      return context.callback('failed to delete', null)
    } else {
      rimraf(context.outDir, err => {
        if (err) {
          console.error(`Failed to delete ${context.outDir}`, err)
          context.callback(1)
          return context.callback('failed to delete', null)
        } else {
          console.log(`Success!`)
          context.callback(null, 'success')
        }
      })
    }
  })
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment