Skip to content

Instantly share code, notes, and snippets.

@suthagar23
Last active June 5, 2022 17:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save suthagar23/ea336510af495b860fe3ea5b5922d8a5 to your computer and use it in GitHub Desktop.
Save suthagar23/ea336510af495b860fe3ea5b5922d8a5 to your computer and use it in GitHub Desktop.
AWS S3-SQS-Download
import boto3
import json
import traceback
import os
import logging, sys
import tarfile
#logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
# Get the service resource
sqs = boto3.resource('sqs', region_name='eu-central-1')
# s3_client = boto3.client('s3', region_name='eu-central-1')
s3_client = boto3.client('s3', region_name='eu-central-1', aws_access_key_id='xxx', aws_secret_access_key='xxx')
# Get the queue
queue = sqs.get_queue_by_name(QueueName='Your SQS Queue Name')
for message in queue.receive_messages(MaxNumberOfMessages=10):
try:
if message is not None:
# Parsing event message from s3 bucket
# --------- CHANGE START ------------------
messageBody = json.loads(message.body)
s3Detail = None
if 'detail' in messageBody:
s3Detail = messageBody['detail']
elif 'Records' in messageBody:
s3Detail = messageBody['Records'][0]['s3']
else:
logging.debug("Skipping the message - does not have detail or Records part!")
continue
# ---------- CHANGE END ----------------
bucket = s3Detail['bucket']['name']
key = s3Detail['object']['key']
logging.debug('bucket :'+bucket)
logging.debug('key :'+key)
# Get filename and directory from key
filename = os.path.basename(key)
directory = '/your_prefix_dir/' + os.path.dirname(key)
logging.debug('filename :'+filename)
logging.debug('directory :'+directory)
# Create Directory if it is not exist
if not os.path.exists(directory):
os.makedirs(directory)
# Download Log File
s3_client.download_file(bucket, key, directory + filename)
logging.debug('Download completed')
# Extract tar.gz File
tar = tarfile.open(directory + filename, "r:gz")
tar.extractall(directory)
tar.close()
logging.debug('extract file completed')
# Remove tar.gz File
os.remove(directory + filename)
# Remove SQS message
message.delete()
except ValueError:
# SQS is dedicated for S3 event message. If there is wrong message from other service, leave message body and remove the message
logging.error('Message format is not valid. Delete message :' + message.body)
message.delete()
except Exception:
logging.error(traceback.format_exc())
else:
logging.info('finish')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment