Skip to content

Instantly share code, notes, and snippets.

@is3ka1
Created March 22, 2023 04:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save is3ka1/d1f114ae36d3be8ea5493532592dd2df to your computer and use it in GitHub Desktop.
Save is3ka1/d1f114ae36d3be8ea5493532592dd2df to your computer and use it in GitHub Desktop.
Create AWS SQS with existed AWS S3 Bucket all-object-created event notification
import boto3
import json
import argparse
def create_sqs_queue_with_policy(
log_processor_user_name: str,
region_name: str,
s3_bucket_name: str,
sqs_queue_name: str,
retention_period: int,
):
# Get account id from sts
sts = boto3.client("sts")
account_id = sts.get_caller_identity()["Account"]
print(f"Account ID: {account_id}")
# Create an SQS client with region
sqs = boto3.client("sqs", region_name=region_name)
# Create an SQS queue and set region
response = sqs.create_queue(QueueName=sqs_queue_name)
sqs_queue_url = response["QueueUrl"]
print(f"SQS queue created: {sqs_queue_url}.")
# Get sqs arn
sqs_arn = f"arn:aws:sqs:{region_name}:{account_id}:{sqs_queue_name}"
# Define the SQS policy
sqs_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3-notification-statement",
"Effect": "Allow",
"Principal": {"Service": "s3.amazonaws.com"},
"Action": "SQS:SendMessage",
"Resource": sqs_arn,
"Condition": {
"StringEquals": {"aws:SourceAccount": account_id},
"ArnLike": {"aws:SourceArn": f"arn:aws:s3:*:*:{s3_bucket_name}"},
},
},
{
"Sid": "__receiver_statement",
"Effect": "Allow",
"Principal": {
"AWS": f"arn:aws:iam::{account_id}:user/{log_processor_user_name}"
},
"Action": [
"SQS:ChangeMessageVisibility",
"SQS:DeleteMessage",
"SQS:ReceiveMessage",
],
"Resource": sqs_arn,
},
],
}
# Convert the policy to a JSON string
sqs_policy_json = json.dumps(sqs_policy)
# Set the SQS queue attributes
sqs.set_queue_attributes(
QueueUrl=sqs_queue_url,
Attributes={
"Policy": sqs_policy_json,
"MessageRetentionPeriod": str(retention_period),
},
)
# Add S3 bucket notification configuration to SQS queue
s3 = boto3.client("s3")
s3.put_bucket_notification_configuration(
Bucket=s3_bucket_name,
NotificationConfiguration={
"QueueConfigurations": [
{"QueueArn": sqs_arn, "Events": ["s3:ObjectCreated:*"]}
]
},
)
print(
f"S3 bucket {s3_bucket_name} notifications added to SQS queue {sqs_queue_name} allowing user {log_processor_user_name} to process logs."
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Create an SQS queue with a policy and attach it to an S3 bucket."
)
parser.add_argument(
"log_processor_user_name",
type=str,
help="the name of the IAM user that will process the logs",
)
parser.add_argument(
"region_name",
type=str,
help="the name of the AWS region to create the SQS queue in",
)
parser.add_argument(
"s3_bucket_name",
type=str,
help="the name of the S3 bucket to attach the SQS queue to",
)
parser.add_argument(
"sqs_queue_name",
type=str,
help="the name of the SQS queue to create and attach to the S3 bucket",
)
parser.add_argument(
"retention_period",
type=int,
help="the retention period (in seconds) for messages in the SQS queue",
)
args = parser.parse_args()
create_sqs_queue_with_policy(
args.log_processor_user_name,
args.region_name,
args.s3_bucket_name,
args.sqs_queue_name,
args.retention_period,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment