Skip to content

Instantly share code, notes, and snippets.

@maiha
Last active January 7, 2021 01:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maiha/1d372282a40dcbf79f615c91b8e982ac to your computer and use it in GitHub Desktop.
Save maiha/1d372282a40dcbf79f615c91b8e982ac to your computer and use it in GitHub Desktop.

Amazon SQS (標準キュー)

Amazon SQS (FIFOキュー)

  • https://dev.classmethod.jp/articles/sqs-new-fifo/
  • exactly once
  • 順序保証あり
  • メッセージdupなし
  • 重複チェックあり (短時間(デフォルト5分)の重複入力を排除する。厳密にやるならRe-runnable設計が必要)

料金

  • 「リクエスト数課金」+「転送量課金」
  • リクエスト数課金: 月間で100万リクエストまで無料。以降、100万リクエストごとに$0.5。
  • 転送量課金: INは無料。OUTは$0.9/GB。同じリージョンなら無料

キューURL

  • https://{REGION_ENDPOINT}/queue.|api-domain|/{YOUR_ACCOUNT_NUMBER}/{YOUR_QUEUE_NAME}
  • 例: https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue (標準キュー)
  • 例: https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue.fifo (FIFOキュー)
  • キューに対するAPIの操作のエンドポイントとして利用される

制限

  • payload は JSON, XML, text のみ (バイナリは使えない)
  • payload に不正な文字コードがある場合、rejectされる
  • 1通のメッセージの最大サイズは 256 KB
  • バッチ処理できるメッセージ数は最大 10
  • バッチ処理できる最大サイズは全メッセージ合算で 256 KB
  • バッチ処理を前提にすると、1通のメッセージの最大サイズは 25.6 KB
  • 1秒あたり最大300回のAPIコール (バッチで10通処理しても最大3000トランザクション)
  • キュー中の同一MessageGroupIdが持てるインフライトメッセージ数は最大 20,000
    • インフライトメッセージ: receiveされたがdeleteはされていないメッセージ
  • キューに保存可能な期間は最大で7日間
  • メッセージ単位の遅延設定はない (キュー単位のみ)
  • メッセージは64KB単位に1APIリクエストと計算される

キュー名

  • 80文字以内
  • 利用可能な文字: アルファベット、数字、ハイフン、アンダースコア
  • FIFOの場合は末尾に .fifo をつける

メッセージ

  • ユーザが自由に属性を10個まで追加可能

送信の順序保証

送信の Exactly Once

  • FIFOキューでは必ず重複排除が働く (設定により2通り)
  • ContentBasedDeduplication (コンテンツに基づく重複排除) が「有効」の場合
    • メッセージのpayloadから SHA-256 によって重複IDが自動的に作成され、重複検査される
    • ダイジェスト計算の対象は本文のみで、属性値は利用されない
    • 送信済みメッセージとしてダイジェスト値が保存される期間は5分間 (値は変更できるかも)
    • 保存期間(deduplication interval)を超えて同じメッセージが送信された場合は、送信が成功する
    • メッセージが削除されても、保存期間中は重複IDは保存され続ける
  • ContentBasedDeduplication (コンテンツに基づく重複排除) が「無効」の場合
    • 送信時に MessageDeduplicationId (String) としてユーザが明示する
    • MessageDeduplicationId がない場合はエラーになる

API

GET 例

https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue
?Action=SendMessage&MessageBody=Your%20message%20text
&Version=2012-11-05
&AUTHPARAMS

POST 例

POST /123456789012/MyQueue HTTP/1.1
Host: sqs.us-east-2.amazonaws.com
Content-Type: application/x-www-form-urlencoded

Action=SendMessage
&MessageBody=Your+Message+Text
&Expires=2020-10-15T12%3A00%3A00Z
&Version=2012-11-05
&AUTHPARAMS

一覧 (listQueues)

request
https://sqs.us-east-2.amazonaws.com/
?Action=ListQueues
&QueueNamePrefix=M
&Expires=2020-04-18T22%3A52%3A43PST
&Version=2012-11-05
&AUTHPARAMS
response
<ListQueuesResponse>
    <ListQueuesResult>
        <QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
    </ListQueuesResult>
    <ResponseMetadata>
        <RequestId>725275ae-0b9b-4762-b238-436d7c65a1ac</RequestId>
    </ResponseMetadata>
</ListQueuesResponse>

キューの作成 (CreateQueue)

  • キュー自体を作成する

キューの削除 (DeleteQueue)

  • キュー自体を削除する (RDBの "DROP TABLE" 相当)

メッセージの削除 (PurgeQueue)

  • キュー内のメッセージを全て削除する (RDBの "DELETE FROM TABLE" 相当)
  • キューとその属性は残るため、キューを引き続き使用できる
  • 削除に最大1分かかるため、API実行後は1分間待機してから、キューを使用すること
AWS.SimpleQueueService.PurgeQueueInProgress: Only one PurgeQueue operation on <QUEUE_NAME> is allowed every 60 seconds.

メッセージの送信 (SendMessage, SendMessageBatch)

request
https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue/
?Action=SendMessage
&MessageBody=This+is+a+test+message
&MessageAttribute.1.Name=my_attribute_name_1
&MessageAttribute.1.Value.StringValue=my_attribute_value_1
&MessageAttribute.1.Value.DataType=String
&Expires=2020-05-05T22%3A52%3A43PST
&Version=2012-11-05
&AUTHPARAMS
response
<SendMessageResponse>
    <SendMessageResult>
        <MD5OfMessageBody>fafb00f5732ab283681e124bf8747ed1</MD5OfMessageBody>
        <MD5OfMessageAttributes>3ae8f24a165a8cedc005670c81a27295</MD5OfMessageAttributes>
        <MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
    </SendMessageResult>
    <ResponseMetadata>
        <RequestId>27daac76-34dd-47df-bd01-1f6e873584a0</RequestId>
    </ResponseMetadata>
</SendMessageResponse>
Request is throttled
  • 300tpsを超えたアクセスで 403
2021-01-07T01:20:51.237055Z  ERROR - send_message: [403]
HTTP::Headers{
  "x-amzn-RequestId" => "7c9438e4-b40d-589f-a4e1-6094d03e631e",
  "Date" => "Thu, 07 Jan 2021 01:20:51 GMT",
  "Connection" =>"close",
  "Content-Type" => "text/xml",
  "Content-Length" => "273"}
<?xml version="1.0" encoding="UTF-8"?>
<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
   <Error>
      <Type>Sender</Type>
      <Code>RequestThrottled</Code>
      <Message>Request is throttled</Message>
      <Detail />
   </Error>
   <RequestId>7c9438e4-b40d-589f-a4e1-6094d03e631e</RequestId>
</ErrorResponse>

メッセージの受信 (ReceiveMessage)

  • https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
  • メッセージを受信しても、キューにはメッセージが残る
  • 正確には、メッセージデータを受信し、キューのメッセージを一定時間「参照不能」状態にする (インフライト状態)
  • メッセージを受信する度に、ReceiptHandle(受信ハンドル)を属性値から受け取る (最大長は1024文字)
  • ack: DeleteMessageによって明示的にメッセージを削除する
  • nack: Receive後に一定時間経過してもDeleteが実行されない場合、自動的にnackとなりメッセージは再び参照可能になる
  • nackが一定回数を超えた場合、メッセージはdeadletterに自動的に移動する
request
https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue/
?Action=ReceiveMessage
&MaxNumberOfMessages=5
&VisibilityTimeout=15
&AttributeName=All
&Expires=2020-04-18T22%3A52%3A43PST
&Version=2012-11-05
&AUTHPARAMS
response
<ReceiveMessageResponse>
  <ReceiveMessageResult>
    <Message>
      <MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
      <ReceiptHandle>
        MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
        Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
        auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
      </ReceiptHandle>
      <MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
      <Body>This is a test message</Body>
      <Attribute>
        <Name>SenderId</Name>
        <Value>195004372649</Value>
      </Attribute>
      <Attribute>
        <Name>SentTimestamp</Name>
        <Value>1238099229000</Value>
      </Attribute>
      <Attribute>
        <Name>ApproximateReceiveCount</Name>
        <Value>5</Value>
      </Attribute>
      <Attribute>
        <Name>ApproximateFirstReceiveTimestamp</Name>
        <Value>1250700979248</Value>
      </Attribute>
    </Message>
  </ReceiveMessageResult>
  <ResponseMetadata>
    <RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
  </ResponseMetadata>
</ReceiveMessageResponse>

メッセージの削除 (DeleteMessage)

request
https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue/
?Action=DeleteMessage
&ReceiptHandle=MbZj6wDW...
&Expires=2020-04-18T22%3A52%3A43PST
&Version=2012-11-05
&AUTHPARAMS
resopnse
<DeleteMessageResponse>
    <ResponseMetadata>
        <RequestId>b5293cb5-d306-4a17-9048-b263635abe42</RequestId>
    </ResponseMetadata>
</DeleteMessageResponse>

注意点

デッドレターキューと MaximumReceives

  • receive後、deleteされないとリドライブとなり、receive回数が増加する
  • receive回数がMaximumReceivesを超えると、メッセージは自動的にデッドレターキューに移動する
  • consoleでメッセージを参照するとこのreceive回数が増える
  • 例えば、MaximumReceivesが3のキューに対して、処理前にconsoleから3回メッセージを参照してしまうと、処理されずにデッドレターになる

参考

特徴

  • https://github.com/localstack/localstack
  • localから使えるAWSテストフレームワーク (多くのサービスに対応しており、料金を気にせず手元で実行できる)
  • 料金: EC2,S3,SQS,CloudWatchなど多くが無料。RDS, ElastiCache などは有料
  • バージョン: 0.12.3

利用例:SQS

サービス起動

docker-compose.yml
version: "3"
services:
  localstack:
    image: localstack/localstack:0.12.3
    environment:
      SERVICES: sqs
      DEFAULT_REGION: ap-northeast-1
      # HOSTNAME_EXTERNAL: localstack
    volumes:
      - ./localstack:/docker-entrypoint-initaws.d
    ports:
      - "4566-4578:4566-4578"
localstack/init_sqs.sh
  • 初期化用スクリプト ( waiting1.fifo キューを作成)
awslocal sqs create-queue --queue-name waiting1.fifo --attributes "FifoQueue=true"
起動
$ docker-compose up -d

この時点で以下のポートが立ち上がる

  • 4566: LocalStackのエンドポイント
  • 4567-4578: 実際の各サービスのポート

API利用

  • endpoint : http://localhost:4566
  • region : ap-northeast-1 (docker-compose.ymlで指定した値)
  • aws_access_key_id : 使わなれない (~/.aws/credentialsは dummy とかでいい)
  • aws_secret_access_key : 使わなれない (~/.aws/credentialsは dummy とかでいい)

クライアント(aws-cli)

$ aws --endpoint-url http://localhost:4566 --region ap-northeast-1  sqs list-queues
{
    "QueueUrls": [
        "http://localstack:4566/000000000000/waiting1.fifo"
    ]
}

クライアント(Crystal)

client = Aws::SQS::Client.new(region: "ap-northeast-1", aws_access_key: "dummy", aws_secret_key: "dummy", endpoint: "http://localhost:4566")

res = client.send_message(
  queue_url: "http://localhost:4566/000000000000/waiting1.fifo",
  message_body: "Hello LocalStack!",
  message_group_id: "g1",
)
p res.status # => OK

res = client.receive_message(
  queue_url: "http://localhost:4566/000000000000/waiting1.fifo",
)
p consume.messages.map(&.body) # => ["Hello LocalStack!"]

参考

候補

  • SQS (FIFO)
  • RabbitMQ (Amazon MQ)
  • RDS (PostgreSQL)
  • Redis (ElastiCache)

比較

E/O nack 分散 属性 優先 速度 本数 大量 保守 月額 月額x9
SQS(FIFO) 6万 18万
RabbitMQ 💎 10万 ~69万
RDS(PG) 💪 💪 💎💎 5万 20万
ElastiCache 💪 💎 4万 16万

 ✅:OK 💎:お金で解決 💪:腕力で解決 ❌:未対応 ⛔:致命的

  • E/O : exactly onceでconsumeできるか
  • nack : consume後にworker側の処理でエラーになったmsgを再取得できるか
  • 分散 : 地理分散可能か (multi region的な)
  • 属性 : 属性別にキュー内のメッセージ数を集計できるか
  • 優先 : 優先度キューが実現できるか (優先度付きメッセージでも可)
  • 速度 : 大量のメッセージをキューとして負荷なく捌けるか
  • 本数 : キューの本数のスケールに対応できるか (1000~キュー)
  • 大量 : 想定以上のデータが来た場合に処理できるか
  • 保守 : 人的な運用コストが低いか (主にマネージドかどうか)
  • 月額 : 現在の規模でのサービス利用料金 (70万msg/日、ペイロード:4KB)
  • 月額x9 : 現在の9倍の規模でのサービス利用料金 (流量x3、サイズx3)

料金計算

  • [想定] キュー数: 400
  • [想定] 流量: 70万msg/日
  • [想定] ペイロード: 4KB
  • [想定] データ量: 2.8GB (4KB * 700000)
  • [SQS月額] 57032円
    • API(polling): 54812円 (400 * 86400 * 30.5 / 1000000 * 0.5 * 104)
    • API(dequeue): 2220円 (700000 * 2 * 30.5 / 1000000 * 0.5 * 104)
  • [SQS月額x9] 171096円
  • [RabbitMQ月額] 92446円(8GiB)~687158円(64GiB)
    • [東京] EBSストレージ(200GB)。M-AZクラスタ構成(3台契約)
    • ブローカー料金(8GiB): 84958円 = 0.372(USD/h) x 24(時間) x 30.5(日間) x 3(台) x 104(円/USD)
    • ブローカー料金(16GiB): 169917円 = 0.744(USD/h) x 24(時間) x 30.5(日間) x 3(台) x 104(円/USD)
    • ブローカー料金(32GiB): 339835円 = 1.488(USD/h) x 24(時間) x 30.5(日間) x 3(台) x 104(円/USD)
    • ブローカー料金(64GiB): 679670円 = 2.976(USD/h) x 24(時間) x 30.5(日間) x 3(台) x 104(円/USD)
    • ストレージ料金(200GB): 7488円 = 0.12(USD/GB/月) x 200(GB) x 1(ヶ月) x 3(台) x 104(円/USD)
  • [RDS(PG)月額] 44,038円
    • db.r6g.large(16GB): 44,038円 (1台、MultiAZ利用、Disk:100GB)
  • [RDS(PG)月額x9] 193,246円
    • db.r6g.2xlarge(64GB): 193,246円 (1台、MultiAZ利用、Disk:1TB)
  • [ElastiCache月額] 37,656円
    • cache.r6g.large(12GB): 18,828円 x2 = 37,656円
  • [ElastiCache月額x9] 150,172円
    • cache.r6g.2xlarge(52GB): 75,086円 x2 = 150,172円
    • cache.r6g.large ( 13GB)x2 = 3.8万
    • cache.r6g.xlarge ( 26GB)x2 = 7.5万
    • cache.r6g.2xlarge ( 52GB)x2 = 15.0万
    • cache.r6g.4xlarge (105GB)x2 = 30.0万
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment