Skip to content

Instantly share code, notes, and snippets.

@radzhome
Created January 18, 2019 06:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save radzhome/d488185dd4a88780d245eca0b708568f to your computer and use it in GitHub Desktop.
Save radzhome/d488185dd4a88780d245eca0b708568f to your computer and use it in GitHub Desktop.
s3 and sqs boto3 client
from __future__ import unicode_literals
"""
S3 bucket CRUD operations core module
"""
import logging
import time
import boto3
import botocore
from botocore.client import Config
class S3Client: # pragma: no cover
"""
S3 class encapsulates uploading,
downloading & other s3 file ops and handling errors
This is not covered in unit test test coverage,
but in integration tests since its an external process
"""
S3_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.000Z' # Not used
RECONNECT_SLEEP_SECS = 0.5
CONN_RETRIES = 10
CONN_CONFIG = Config(connect_timeout=5, retries={'max_attempts': 0})
def __init__(self, config, reconnect_sleep_secs=RECONNECT_SLEEP_SECS, conn_retries=CONN_RETRIES):
"""
Load config from passed params or override with defaults
:param config: dict, config with access_key_id, secret_access_key, bucket name
:return: None
"""
self.config = config
self.bucket_name = self.config['bucket_name']
self.access_key_id = self.config['access_key_id']
self.secret_access_key = self.config['secret_access_key']
self.aws_region = self.config['aws_region']
self.RECONNECT_SLEEP_SECS = reconnect_sleep_secs
self.CONN_RETRIES = conn_retries
self.connection_attempt = 0
self.connection = None
self.bucket = None
self.connect()
def connect(self):
"""
Creates object connection to the designated region (self.boto.cli_region).
The connection is established on the first call for this instance (lazy) and cached.
:return: None
"""
try:
self.connection_attempt += 1
self.connection = boto3.resource('s3', region_name=self.aws_region,
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
config=self.CONN_CONFIG)
self._get_bucket()
except Exception as e:
logging.exception("S3Client.connect failed with params {}, error {}".format(self.config, e))
if self.connection_attempt >= self.CONN_RETRIES:
raise
def _get_bucket(self):
"""
Uses S3 Connection and return connection to queue
S3 used for getting the listing file in the SQS message
:return: None
"""
try:
self.bucket = self.connection.Bucket(name=self.bucket_name)
except Exception as e:
# I.e. gaierror: [Errno -2] Name or service not known
logging.exception("S3Client.get_bucket unable to get bucket {}, error {}".format(self.bucket_name, e))
raise
def list(self):
"""
List contents of a bucket
:return: list of s3.ObjectSummary
"""
return list(self.bucket.objects.all())
def read(self, key):
"""
Get bucket key value, return contents
Get contents of a file from S3
:param key: str, bucket key filename
:return: str, contents of key
"""
try:
obj = self.connection.Object(self.bucket_name, key)
contents = obj.get()['Body'].read().decode('utf-8')
except Exception as e: # Retry in-case we have a connection error
logging.exception("S3Client.read failed for key {}, error {}".format(key, e))
time.sleep(self.RECONNECT_SLEEP_SECS)
self.connect()
contents = self.read(key)
return contents
def write(self, key, contents):
"""
Create bucket key from string
Write content to a file in S3
:param contents: str, contents to save to a file
:param key: str, bucket key filename
:return: dict, output
"""
output = response = None
try:
response = self.connection.Object(self.bucket_name, key).put(Body=contents)
output = {
'file_name': key,
# 'is_new': not k.exists(),
}
except Exception as e:
logging.exception("S3Client.write failed for key {}, error {}, response {}".format(key, e, response))
return output
def upload(self, key, origin_path):
"""
Create bucket key from filename
Upload a file to S3 from origin file
:param origin_path: str, path to origin filename
:param key: str, bucket key filename
:return: bool, success
"""
try:
file_body = open(origin_path, 'rb')
self.connection.Bucket(self.bucket_name).put_object(Key=key, Body=file_body)
except Exception as e:
logging.exception("S3Client.upload failed for key {}, error {} ".format(key, e))
return True
def download(self, key, destination):
"""
Get key
Download a file from S3 to destination
:param destination: str, path to local file name
:param key: str, bucket key filename
:return: bool, success
"""
result = True
try:
self.bucket.download_file(key, destination)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
logging.error("S3Client.download bucket missing key file {}".format(key))
else:
raise
except Exception as e:
logging.warning("S3Client.download failed for key {} to {}, error {}, retrying".format(key, destination, e))
time.sleep(self.RECONNECT_SLEEP_SECS)
self.connect()
result = self.download(key, destination)
return result
def remove(self, keys):
"""
Deletes the given keys from the given bucket.
:param keys: list, list of key names
:return: bool, success
"""
logging.warning("S3Client.remove deleting keys {}".format(keys))
objects = [{'Key': key} for key in keys]
self.bucket.delete_objects(Delete={'Objects': objects})
return True
if __name__ == "__main__":
# TODO: turn into tests
conf = {'access_key_id': '<test key id>',
'secret_access_key': '<test access key>',
'aws_region': 'ca-central-1',
'bucket_name': 'aws-web-distro'}
s3_client = S3Client(config=conf)
print(s3_client.list())
# s3_client.upload('test2.jpg', '/Users/rad/Desktop/test.jpg')
print(s3_client.read('readonly-access/readonly.txt'))
# print(s3_client.write('new-test-key.txt', 'this is some data'))
print(s3_client.remove(keys=['test.jpg', 'test2.jpg']))
# s3_client.download('new-test-key.txt', 'my_local_image.jpg')
from __future__ import unicode_literals
"""
SQS feed queue core class
Sample SQS message from S3 file created trigger:
{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "us-west-2",
"eventTime": "2009-03-21T03:24:48.558Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:someid"
},
"requestParameters": {
"sourceIPAddress": "1.2.3.4"
},
"responseElements": {
"x-amz-request-id": "some-request-id",
"x-amz-id-2": "some-id"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "triggerName",
"bucket": {
"name": "us-west-2-feeds-uat",
"ownerIdentity": {
"principalId": "some-id"
},
"arn": "arn:aws:s3:::us-west-2-bucket"
},
"object": {
"key": "some file",
"size": 1319506,
"eTag": "some-tag",
"sequencer": "some-sequence"
}
}
}
]
}
"""
import logging
import time
import boto3
from botocore import exceptions
# Try to get ujson if available
try:
import ujson as json
except ImportError:
import json
class SQSClient:
"""
SQS class encapsulates queue operations,
This is not covered in unit test test coverage,
but in integration tests since its an external process
"""
# Request timeout to poll for msg, must be 0 to 20, poll seconds
MSG_WAIT_SECONDS = 20
# Make message invisible to other consumers. Defaults via SQS to 30, visibility timeout
MSG_INVISIBLE_SECONDS = 14
RECONNECT_SLEEP_SECS = 0.5
CONN_RETRIES = 20
def __init__(self, config, msg_wait_seconds=MSG_WAIT_SECONDS,
msg_invisible_seconds=MSG_INVISIBLE_SECONDS, reconnect_sleep_secs=RECONNECT_SLEEP_SECS,
conn_retries=CONN_RETRIES):
"""
Load config from passed params or override with defaults
:param config: dict with access_key_id, secret_access_key, bucket name
:return: None
"""
# Load from passed params or override with defaults
try:
self.config = config
self.access_key_id = self.config['access_key_id']
self.secret_access_key = self.config['secret_access_key']
self.aws_region = self.config['aws_region']
self.queue_url = self.config['queue_url']
self.MSG_WAIT_SECONDS = msg_wait_seconds
self.MSG_INVISIBLE_SECONDS = msg_invisible_seconds
self.RECONNECT_SLEEP_SECS = reconnect_sleep_secs
self.CONN_RETRIES = conn_retries
except Exception as e:
logging.exception("SQSClient.__init__ configuration error {}".format(e))
self.access_key_id = None
self.secret_access_key = None
self.aws_region = None
self.queue_url = None
self.config = None
self.connection_attempt = 0
self.client = None
self.connect()
def connect(self):
"""
Establish SQS connection
"""
try:
self.client = boto3.client('sqs',
region_name=self.aws_region,
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key)
self.connection_attempt = 0 # Got queue connection, reset retries
except Exception as e:
logging.exception("SQSClient.connect failed with params {}, error {}".format(self.config, e))
self.connection_attempt += 1
if self.connection_attempt >= self.CONN_RETRIES:
raise
def get_messages(self, num_messages=1, msg_invisible_seconds=None, msg_wait_seconds=None):
"""
Get messages from sqs feed queue
:param num_messages: int, number of messages to get, max is 10
:param msg_wait_seconds: int, time to wait (poll time between retries)
:param msg_invisible_seconds: int, how long the message is invisible to other consumers
:return: list, of sqs messages object
"""
if msg_invisible_seconds is None:
msg_invisible_seconds = self.MSG_INVISIBLE_SECONDS
if msg_wait_seconds is None:
msg_wait_seconds = self.MSG_WAIT_SECONDS
try:
# Long polling for a message from SQS (list of 1 message)
response = self.client.receive_message(MaxNumberOfMessages=num_messages,
QueueUrl=self.queue_url,
WaitTimeSeconds=msg_wait_seconds,
VisibilityTimeout=msg_invisible_seconds) or {}
sqs_messages = response.get('Messages') or []
except Exception as e:
# I.e. gaierror: [Errno -2] Name or service not known
logging.exception("SQSClient.get_messages error, retrying. {}".format(e))
time.sleep(self.RECONNECT_SLEEP_SECS)
self.connect()
if self.connection_attempt >= self.CONN_RETRIES:
raise
sqs_messages = self.get_messages()
return sqs_messages
def delete_message(self, sqs_message):
"""
Delete an sqs msg
:param sqs_message: str, receipt handle associated with
the message to delete.
:return:
"""
try:
receipt_handle = sqs_message['ReceiptHandle']
except KeyError:
logging.error("SQSClient.delete_message missing 'ReceiptHandle' key in message, required for delete")
raise
try:
self.client.delete_message(QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle)
except self.client.exceptions.ReceiptHandleIsInvalid:
# Message was already deleted, handle no longer valid, old msg
pass
except Exception as e:
logging.exception("SQSClient.delete_message error, retrying. {}".format(e))
time.sleep(self.RECONNECT_SLEEP_SECS)
self.connect()
if self.connection_attempt >= self.CONN_RETRIES:
raise
self.delete_message(sqs_message)
return True
def send_message(self, body, delay_seconds=0):
"""
For testing only, send a message
:param body: str, message_content
:param delay_seconds: int, time to make message visible
:return: bool, success
"""
if isinstance(body, dict):
body = json.dumps(body)
return self.client.send_message(
QueueUrl=self.queue_url,
MessageBody=body,
DelaySeconds=delay_seconds,
)
if __name__ == "__main__":
# TODO: Turn into tests
conf = {'access_key_id': '<test key id>',
'secret_access_key': '<test access key>',
'aws_region': 'ca-central-1',
'queue_url': 'https://sqs.ca-central-1.amazonaws.com/584374059506/django-content-services-applenews'}
client = SQSClient(conf, conn_retries=1, msg_wait_seconds=10)
# Create some messages
client.send_message(body={'test': 'body'})
client.send_message(body={'test': 'body'})
msgs = client.get_messages(num_messages=2)
assert isinstance(msgs, list)
assert msgs
assert len(msgs) >= 1, len(msgs) # Could be 1 or 2
assert isinstance(msgs[0], dict), type(msgs[0])
is_deleted = [client.delete_message(msg) for msg in msgs]
assert all(is_deleted)
# Clean up all messages
while msgs:
[client.delete_message(msg) for msg in msgs]
msgs = client.get_messages(msg_wait_seconds=1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment