Skip to content

Instantly share code, notes, and snippets.

@civic
Last active January 22, 2023 02:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save civic/65a336fa4312946448f19d3999088730 to your computer and use it in GitHub Desktop.
Save civic/65a336fa4312946448f19d3999088730 to your computer and use it in GitHub Desktop.
producer.py
import os
import boto3
import json
import logging
from mypy_boto3_sqs.client import SQSClient
logging.basicConfig(level=logging.INFO)
queue_url = os.environ["MY_QUEUE_URL"]
client: SQSClient = boto3.client("sqs")
def send_message(no: int):
result = client.send_message(
QueueUrl=queue_url,
MessageGroupId="my-group",
MessageDeduplicationId=f"{no}",
MessageBody=json.dumps({"no": no}),
)
logging.info("Sent message: no:%s\tMsgId:%s", no, result['MessageId'])
if __name__ == "__main__":
logging.info("Start to put SQS messages.")
for n in range(10):
send_message(no=n+1)
logging.info("End to put SQS messages.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment