Skip to content

Instantly share code, notes, and snippets.

@alukach
Last active December 23, 2021 06:58
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alukach/20b17d4deff4f8b17d2d1cf7490fe1c0 to your computer and use it in GitHub Desktop.
Save alukach/20b17d4deff4f8b17d2d1cf7490fe1c0 to your computer and use it in GitHub Desktop.
AWS S3 Batch Operation boilerplate
import urllib
import boto3
from botocore.exceptions import ClientError
s3 = boto3.resource("s3")
TMP_FAILURE = "TemporaryFailure"
FAILURE = "PermanentFailure"
SUCCESS = "Succeeded"
def process_object(src_object):
return "TODO: Populate with processing task..."
def get_s3_object(event):
# Parse Amazon S3 Key, Key Version, and Bucket ARN
s3Key = urllib.parse.unquote(event["tasks"][0]["s3Key"])
s3VersionId = event["tasks"][0]["s3VersionId"] # Unused
s3BucketArn = event["tasks"][0]["s3BucketArn"]
s3Bucket = s3BucketArn.split(":::")[-1]
return s3.Object(s3Bucket, s3Key)
def build_result(status: str, msg: str):
return dict(resultCode=status, resultString=msg)
def handler(event, context):
task_id = event["tasks"][0]["taskId"]
job_params = {
"invocationId": event["invocationId"],
"invocationSchemaVersion": event["invocationSchemaVersion"]
}
s3_object = get_s3_object(event)
try:
output = process_object(s3_object)
# Mark as succeeded
result = build_result(SUCCESS, output)
except ClientError as e:
# If request timed out, mark as a temp failure
# and Amazon S3 batch operations will make the task for retry. If
# any other exceptions are received, mark as permanent failure.
errorCode = e.response["Error"]["Code"]
errorMessage = e.response["Error"]["Message"]
if errorCode == "RequestTimeout":
result = build_result(
TMP_FAILURE, "Retry request to Amazon S3 due to timeout."
)
else:
result = build_result(FAILURE, f"{errorCode}: {errorMessage}")
except Exception as e:
# Catch all exceptions to permanently fail the task
result = build_result(FAILURE, f"Exception: {e}")
return {
**job_params,
"treatMissingKeysAs": "PermanentFailure",
"results": [{**result, "taskId": task_id}],
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment