Created
March 22, 2023 04:01
-
-
Save is3ka1/d1f114ae36d3be8ea5493532592dd2df to your computer and use it in GitHub Desktop.
Create AWS SQS with existed AWS S3 Bucket all-object-created event notification
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import argparse | |
def create_sqs_queue_with_policy( | |
log_processor_user_name: str, | |
region_name: str, | |
s3_bucket_name: str, | |
sqs_queue_name: str, | |
retention_period: int, | |
): | |
# Get account id from sts | |
sts = boto3.client("sts") | |
account_id = sts.get_caller_identity()["Account"] | |
print(f"Account ID: {account_id}") | |
# Create an SQS client with region | |
sqs = boto3.client("sqs", region_name=region_name) | |
# Create an SQS queue and set region | |
response = sqs.create_queue(QueueName=sqs_queue_name) | |
sqs_queue_url = response["QueueUrl"] | |
print(f"SQS queue created: {sqs_queue_url}.") | |
# Get sqs arn | |
sqs_arn = f"arn:aws:sqs:{region_name}:{account_id}:{sqs_queue_name}" | |
# Define the SQS policy | |
sqs_policy = { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Sid": "s3-notification-statement", | |
"Effect": "Allow", | |
"Principal": {"Service": "s3.amazonaws.com"}, | |
"Action": "SQS:SendMessage", | |
"Resource": sqs_arn, | |
"Condition": { | |
"StringEquals": {"aws:SourceAccount": account_id}, | |
"ArnLike": {"aws:SourceArn": f"arn:aws:s3:*:*:{s3_bucket_name}"}, | |
}, | |
}, | |
{ | |
"Sid": "__receiver_statement", | |
"Effect": "Allow", | |
"Principal": { | |
"AWS": f"arn:aws:iam::{account_id}:user/{log_processor_user_name}" | |
}, | |
"Action": [ | |
"SQS:ChangeMessageVisibility", | |
"SQS:DeleteMessage", | |
"SQS:ReceiveMessage", | |
], | |
"Resource": sqs_arn, | |
}, | |
], | |
} | |
# Convert the policy to a JSON string | |
sqs_policy_json = json.dumps(sqs_policy) | |
# Set the SQS queue attributes | |
sqs.set_queue_attributes( | |
QueueUrl=sqs_queue_url, | |
Attributes={ | |
"Policy": sqs_policy_json, | |
"MessageRetentionPeriod": str(retention_period), | |
}, | |
) | |
# Add S3 bucket notification configuration to SQS queue | |
s3 = boto3.client("s3") | |
s3.put_bucket_notification_configuration( | |
Bucket=s3_bucket_name, | |
NotificationConfiguration={ | |
"QueueConfigurations": [ | |
{"QueueArn": sqs_arn, "Events": ["s3:ObjectCreated:*"]} | |
] | |
}, | |
) | |
print( | |
f"S3 bucket {s3_bucket_name} notifications added to SQS queue {sqs_queue_name} allowing user {log_processor_user_name} to process logs." | |
) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description="Create an SQS queue with a policy and attach it to an S3 bucket." | |
) | |
parser.add_argument( | |
"log_processor_user_name", | |
type=str, | |
help="the name of the IAM user that will process the logs", | |
) | |
parser.add_argument( | |
"region_name", | |
type=str, | |
help="the name of the AWS region to create the SQS queue in", | |
) | |
parser.add_argument( | |
"s3_bucket_name", | |
type=str, | |
help="the name of the S3 bucket to attach the SQS queue to", | |
) | |
parser.add_argument( | |
"sqs_queue_name", | |
type=str, | |
help="the name of the SQS queue to create and attach to the S3 bucket", | |
) | |
parser.add_argument( | |
"retention_period", | |
type=int, | |
help="the retention period (in seconds) for messages in the SQS queue", | |
) | |
args = parser.parse_args() | |
create_sqs_queue_with_policy( | |
args.log_processor_user_name, | |
args.region_name, | |
args.s3_bucket_name, | |
args.sqs_queue_name, | |
args.retention_period, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment