Skip to content

Instantly share code, notes, and snippets.

@oppai
Created December 3, 2019 08:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oppai/36bd2de6ba3739d7ed539781bf9f275c to your computer and use it in GitHub Desktop.
Save oppai/36bd2de6ba3739d7ed539781bf9f275c to your computer and use it in GitHub Desktop.
S3 select objectのAPIクライアント & デコーダ
defmodule Aws.S3.Selector do
@doc """
S3SelectObject用のモジュール
> {:ok, result} = Aws.S3.Selector.query("my-bucket",
"x_report/result.parquet",
"select * from s3object")
> messages = Aws.S3.Selector.decode(result.body)
[%{
header: %{
":content-type" => "application/octet-stream",
":event-type" => "Records",
":message-type" => "event"
},
payload: "xxxxxxx\n"
},
%{
header: %{
":content-type" => "text/xml",
":event-type" => "Stats",
":message-type" => "event"
},
payload: "<Stats xmlns=\"\"><BytesScanned>4597</BytesScanned><BytesProcessed>543</BytesProcessed><BytesReturned>753</BytesReturned></Stats>"
},
%{
header: %{":event-type" => "End", ":message-type" => "event"},
payload: "failed"
}]
> messages |> Aws.S3.Selector.convert_messages()
[%{
"xxxx" => "yyy",
"yyyy" => "zzz"
},
%{
"xxxx" => "yyy",
"yyyy" => "zzz"
}]
Message Structure
Also See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
+--------------+---------------+-------------+----------+---------+-------------+-------+----
| total_length | header_length | payload_crc | header | payload | message_crc | .. Next
| 32 bit | 32 bits | 32 bit | x bit | y bit | 32 bits | .. Message
+--------------+---------------+-------------+----------+---------+-------------+------------
"""
def decode(
<<
total_len::big-integer-size(32),
header_len::big-integer-size(32),
prelude_crc::big-integer-size(32),
header::binary-size(header_len),
body::binary
>> = data
) do
unless is_valid_crc(<<total_len::big-integer-size(32), header_len::big-integer-size(32)>>, prelude_crc) do
raise Aws.S3.Selector.CRCError
end
# payload_length = total_length - header_length - sizeOf(total_length)
# - sizeOf(header_length) - sizeOf(prelude_crc) - sizeOf(message_crc)
payload_len = total_len - header_len - 16
# message_len = total_len - sizeOf(message_crc)
message_len = total_len - 4
decoded_header = decode_header(header)
body
|> case do
<<payload::binary-size(payload_len), m_crc::big-integer-size(32)>> when payload_len > 0 ->
<<m_data::binary-size(message_len), _::binary>> = data
unless is_valid_crc(m_data, m_crc), do: raise(Aws.S3.Selector.CRCError)
[%{header: decoded_header, payload: payload}]
<<payload::binary-size(payload_len), m_crc::big-integer-size(32), tail::binary>> when payload_len > 0 ->
<<m_data::binary-size(message_len), _::binary>> = data
unless is_valid_crc(m_data, m_crc), do: raise(Aws.S3.Selector.CRCError)
[%{header: decoded_header, payload: payload}] ++ decode(tail)
_ ->
[%{header: decoded_header, payload: ""}]
end
end
defp decode_header(data) do
case data do
<<kl::8, key::binary-size(kl), _type::8, vl::2*8, value::binary-size(vl)>> ->
%{key => value}
<<kl::8, key::binary-size(kl), _type::8, vl::2*8, value::binary-size(vl), tail::binary>> ->
Map.merge(%{key => value}, decode_header(tail))
_ ->
%{}
end
end
defp is_valid_crc(<<data::binary>>, checksum) do
CRC.crc_32(data) === checksum
end
def convert_messages(messages) do
messages
|> Enum.map(fn msg ->
msg.header[":event-type"]
|> case do
"Records" ->
msg.payload |> String.split("\n") |> Enum.reject(&(&1 == ""))
_ ->
[]
end
end)
|> List.flatten()
|> Enum.map(&Poison.decode!/1)
end
def query(bucket_name, parquet_path, expression) do
%ExAws.Operation.S3{
body: body(expression),
bucket: bucket_name,
headers: %{},
http_method: :post,
params: %{},
parser: &ExAws.Utils.identity/1,
path: "#{parquet_path}?select&select-type=2",
resource: "",
service: :s3,
stream_builder: nil
}
|> ExAws.request()
end
defp body(expression),
do: """
<?xml version="1.0" encoding="UTF-8"?>
<SelectRequest>
<Expression>#{expression}</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization>
<CompressionType>NONE</CompressionType>
<Parquet />
</InputSerialization>
<OutputSerialization>
<JSON>
<CompressionType>NONE</CompressionType>
<RecordDelimiter>\n</RecordDelimiter>
</JSON>
</OutputSerialization>
<RequestProgress>
<Enabled>FALSE</Enabled>
</RequestProgress>
</SelectRequest>
"""
defmodule(CRCError, do: defexception(message: "crc checksum error"))
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment