Skip to content

Instantly share code, notes, and snippets.

@TwistingTwists
Created July 16, 2024 18:40
Show Gist options
  • Save TwistingTwists/c4ba1edf72abcf9316eb0ee0e61edd42 to your computer and use it in GitHub Desktop.
Save TwistingTwists/c4ba1edf72abcf9316eb0ee0e61edd42 to your computer and use it in GitHub Desktop.
defmodule RazorNewWeb.S3Writer do
@moduledoc """
Module to stream video directly to S3 bucket.
But not via presigned url.
"""
use TypedStruct
@typedoc "S3Writer struct"
typedstruct do
field(:filename, String.t(), enforce: true)
field(:s3_config, ExAws.Config.t(), enforce: true)
field(:s3_init_op, ExAws.Operation.S3.t(), default: nil)
field(:s3_upload_op, ExAwsExAws.S3.Upload.t(), default: nil)
# start chunk number from 0 so that first chunk (zero-th chunk) is consumed in intializing request
# rest chunks can be numbered correctly, starting 1 which is how ex_aws_s3 expects chunks to be.
field(:part_number, non_neg_integer(), default: 0)
field(:parts, List.t(), default: [])
field(:accumulated_chunk, binary(), default: <<>>)
end
@behaviour Phoenix.LiveView.UploadWriter
alias ExAws.S3
require Logger
@impl Phoenix.LiveView.UploadWriter
def init(opts) do
Logger.info("inside s3 writer now")
{s3_config, rest} = Keyword.pop(opts, :s3_config, ExAws.Config.new(:s3))
file_name = Keyword.fetch!(rest, :filename)
state = %__MODULE__{
filename: file_name,
s3_config: s3_config
}
state = setup_s3_upload_state(state)
{:ok, state}
end
defp setup_s3_upload_state(state) do
s3_upload_op =
ExAws.S3.upload([], state.s3_config.bucket, state.filename)
s3_upload_op_with_upload_id =
case ExAws.S3.Upload.initialize(s3_upload_op, state.s3_config) do
{:ok, s3_upload_op_with_upload_id} -> s3_upload_op_with_upload_id
_ -> raise "Could not intiate upload to the file"
end
# set timeout to 3 min for slow connections
s3_upload_op_with_upload_id = %{
s3_upload_op_with_upload_id
| opts: Keyword.merge([timeout: 180 * 1000], s3_upload_op_with_upload_id.opts)
}
%{
state
| part_number: state.part_number + 1,
s3_upload_op: s3_upload_op_with_upload_id
}
end
@impl Phoenix.LiveView.UploadWriter
def meta(state) do
Map.take(state, [:filename, :s3_config, :part_number])
end
@impl Phoenix.LiveView.UploadWriter
def write_chunk(data, state) do
part_number = state.part_number
{accumulated_chunk, new_state} = accumulate_chunk(data, state)
if byte_size(accumulated_chunk) >= 5 * 1024 * 1024 do
upload_chunk(accumulated_chunk, new_state)
else
{:ok, new_state}
end
end
defp accumulate_chunk(data, state) do
new_accumulated_chunk = state.accumulated_chunk <> data
new_state = %{state | accumulated_chunk: new_accumulated_chunk}
{new_accumulated_chunk, new_state}
end
defp upload_chunk(data, state) do
part_number = state.part_number
case ExAws.S3.Upload.upload_chunk(
{data, part_number},
Map.delete(state.s3_upload_op, :src),
state.s3_config
) do
{^part_number, _etag} = part ->
{:ok,
%{
state
| part_number: part_number + 1,
parts: [part | state.parts],
accumulated_chunk: <<>>
}}
{:error, reason} ->
{:error, reason}
end
end
@impl Phoenix.LiveView.UploadWriter
def close(state, reason) do
case ExAws.S3.Upload.complete(state.parts, state.s3_upload_op, state.s3_config) do
{:ok, _} ->
Logger.warning("[complete] S3 Upload #{inspect(state.s3_config)}")
{:ok, state}
{:error, reason} ->
Logger.warning("[NOT complete] S3 Upload #{inspect(state.s3_config)}")
{:error, reason, state}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment