Skip to content

Instantly share code, notes, and snippets.

@jg75
Last active September 12, 2019 06:07
Show Gist options
  • Save jg75/af86bcc4caec03098d52 to your computer and use it in GitHub Desktop.
Save jg75/af86bcc4caec03098d52 to your computer and use it in GitHub Desktop.
boto3 examples
import boto3
import botocore
from boto3.s3.transfer import S3Transfer
def lookup(s3, bucket_name):
try:
s3.meta.client.head_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
error_code = int(e.response['Error']['Code'])
if error_code == 404:
return False
return True
if __name__ == '__main__':
s3 = boto3.resource('s3')
bucket_name = 'test-kick-the-bucket'
bucket = s3.Bucket(bucket_name)
cors = bucket.Cors()
# Getting the bucket doesn't guarantee that it exists.
# Check if it's a bucket, if not, then create it
# and add a CORS rule.
if not lookup(s3, bucket_name):
s3.create_bucket(Bucket=bucket_name)
# Cross Origin Resource Sharing
# Create a new CORS rule
cors.put(CORSConfiguration={
'CORSRules': [{
'AllowedMethods': ['GET'],
'AllowedOrigins': ['*']
}]
})
else:
# Cross Origin Resource Sharing
# Delete any existing CORS rules
cors.delete()
# Create a new CORS rule
cors.put(CORSConfiguration={
'CORSRules': [{
'AllowedMethods': ['GET'],
'AllowedOrigins': ['*']
}]
})
# Creating a bucket that already exists
# doesn't actually do anything anyway
s3.create_bucket(Bucket=bucket_name)
# Add a key
key = s3.Object(bucket_name, 'stuff.txt')
key.put(Body='stuff!')
# Add a key from a file
key = s3.Object(bucket_name, 's3_stuff.py')
key.put(Body=open('s3_stuff.py', 'rb'))
# Add a key and set access controls to publically available
key = s3.Object(bucket_name, 'stuff')
key.put(Body='stuff!', ACL='public-read')
# Add a key with some metadata added
key = s3.Object(bucket_name, 'funny_stuff')
key.put(Body='stuff!', Metadata={'foo': 'bar'})
# Iterate all the buckets
# for bucket in s3.buckets.all():
# Iterate all the keys
for key in bucket.objects.all():
metadata = key.get()['Metadata']
foo = metadata.get('foo')
if metadata and foo:
print "%s: foo=%s" % (key.key, foo)
else:
print key.key
# S3Transfer client example
s3_client = boto3.client('s3')
s3_transfer = S3Transfer(s3_client)
s3_transfer.upload_file('./example.txt', bucket.name, 'example.txt')
s3_transfer.download_file(bucket.name, 'example.txt', './example.txt')
# Delete all the things!
for key in bucket.objects.all():
key.delete()
bucket.delete()
import boto3
# Override the region name configured in the default session.
boto3.setup_default_session(region_name='us-east-1')
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName='test')
# Check if the queue exists with a 5 second delay,
# if not, then create it with a 5 second delay.
if not queue:
attributes = {'DelaySeconds': '5'}
queue = sqs.create_queue(
QueueName=queue_name,
Attributes=attributes
)
# Throw an error if another queue with that name already exists.
elif queue.attributes.get('DelaySeconds') != '5':
# List all queues in this region
for queue in sqs.queues.all():
print queue.attributes['QueueArn'].split(':')[-1], queue.url
raise ValueError(': A queue named test is already in use')
for message in queue.receive_messages(MessageAttributeNames=['Author']):
author = ''
if message.message_attributes is not None:
author = message.message_attributes.get('Author').get('StringValue')
print '%' * 30
print "%s: %s" % (author, message.body)
print message
print '%' * 30
# Delete the message once it's been processed
message.delete()
import boto3
# Override the region name configured in the default session.
boto3.setup_default_session(region_name='us-east-1')
sqs = boto3.resource('sqs')
queue_name = 'test'
queue = None
# Check if the queue exists with a 5 second delay,
# if not, then create it with a 5 second delay.
try:
queue = sqs.get_queue_by_name(QueueName=queue_name)
# Throw an error if another queue with that name already exists.
if queue.attributes.get('DelaySeconds') != '5':
raise ValueError(': A queue named test is already in use')
except:
attributes = {'DelaySeconds': '5'}
queue = sqs.create_queue(
QueueName=queue_name,
Attributes=attributes
)
# List all queues in this region
for queue in sqs.queues.all():
print queue.attributes['QueueArn'].split(':')[-1], queue.url
# Send a message onto the queue
response = queue.send_message(
MessageBody='hello world',
MessageAttributes={
'Author': {
'StringValue': 'Viking Pirate Ninja Rock God',
'DataType': 'String'
}
}
)
# Print the response data if the request failed
if response.get('ResponseMetadata').get('HTTPStatusCode') != 200:
print response
# Create a bunch of message entries
entries = []
for i in range(1, 10):
entries.append({
'Id': "Blah-%s" % str(i),
'MessageBody': 'all kinds of stuff',
'MessageAttributes': {
'Author': {
'StringValue': 'Viking Pirate Ninja Rock God',
'DataType': 'String'
}
}
})
# Batch the messages onto the queue
response = queue.send_messages(Entries=entries)
# Print the response data for failed messages
failed = response.get('Failed')
if failed:
print failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment