Skip to content

Instantly share code, notes, and snippets.

@mjgp2 mjgp2/create-queue.sh
Last active Jan 7, 2018

Embed
What would you like to do?
Instantly mirror an S3 bucket to a local directory, e.g. for ELB logs to be consumed by logstash
region=us-east-1
s3_bucket_name=$1
sns_topic_name=$2
sqs_queue_name=$sns_topic_name
# create the SNS topic
sns_topic_arn=$(aws sns create-topic \
--region "$region" \
--name "$sns_topic_name" \
--output text \
--query 'TopicArn')
echo sns_topic_arn=$sns_topic_arn
# Allow S3 to publish to the SNS topic for activity in the specific S3 bucket.
aws sns set-topic-attributes \
--topic-arn "$sns_topic_arn" \
--attribute-name Policy \
--attribute-value '{
"Version": "2008-10-17",
"Id": "s3-publish-to-sns",
"Statement": [{
"Effect": "Allow",
"Principal": { "AWS" : "*" },
"Action": [ "SNS:Publish" ],
"Resource": "'$sns_topic_arn'",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:'$s3_bucket_name'"
}
}
}]
}'
# Add a notification to the S3 bucket so that it sends messages to the SNS topic when objects are created (or updated).
aws s3api put-bucket-notification \
--region "$region" \
--bucket "$s3_bucket_name" \
--notification-configuration '{
"TopicConfiguration": {
"Events": [ "s3:ObjectCreated:*" ],
"Topic": "'$sns_topic_arn'"
}
}'
# Create the SQS queue
sqs_queue_url=$(aws sqs create-queue \
--queue-name $sqs_queue_name \
--attributes 'ReceiveMessageWaitTimeSeconds=20,VisibilityTimeout=300' \
--output text \
--query 'QueueUrl')
echo sqs_queue_url=$sqs_queue_url
sqs_queue_arn=$(aws sqs get-queue-attributes \
--queue-url "$sqs_queue_url" \
--attribute-names QueueArn \
--output text \
--query 'Attributes.QueueArn')
echo sqs_queue_arn=$sqs_queue_arn
# Give the SNS topic permission to post to the SQS queue.
sqs_policy='{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Principal": { "AWS": "*" },
"Action":"sqs:SendMessage",
"Resource":"'$sqs_queue_arn'",
"Condition":{
"ArnEquals":{
"aws:SourceArn":"'$sns_topic_arn'"
}
}
}
]
}'
sqs_policy_escaped=$(echo $sqs_policy | perl -pe 's/"/\\"/g')
sqs_attributes='{"Policy":"'$sqs_policy_escaped'"}'
aws sqs set-queue-attributes \
--queue-url "$sqs_queue_url" \
--attributes "$sqs_attributes"
# Subscribe the SQS queue to the SNS topic.
aws sns subscribe \
--topic-arn "$sns_topic_arn" \
--protocol sqs \
--notification-endpoint "$sqs_queue_arn"
#!/usr/bin/env python
import boto.sqs, os, json, sys
from boto.s3.connection import S3Connection
from boto.s3.key import Key
if len(sys.argv) != 4:
print "Usage: python mirror_bucket.py [basedir] [queue_name] [bucket_name]"
exit(1)
basedir = sys.argv[1]
queue_name = sys.argv[2]
bucket_name = sys.argv[3]
print "Mirroring files from %s to %s" % (bucket_name, basedir)
sqs_connection = boto.sqs.connect_to_region(
"us-east-1",
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'])
s3_connection = S3Connection(os.environ['AWS_ACCESS_KEY_ID'], os.environ['AWS_SECRET_ACCESS_KEY'])
queue = sqs_connection.get_queue(queue_name)
bucket = s3_connection.get_bucket(bucket_name)
while True:
m = queue.read(60,message_attributes=["s3"])
if m is None: continue
body = m.get_body()
for record in json.loads(json.loads(body)["Message"])["Records"]:
name = record["s3"]["object"]["key"]
key = Key(bucket=bucket, name=name)
f = "%s/%s" % ( basedir, name )
d=os.path.dirname(f)
if not os.path.exists(d):
os.makedirs(d)
print "Downloading %s" % f
key.get_contents_to_filename(f)
print "Done"
queue.delete_message(m)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.