Created
June 12, 2024 22:11
-
-
Save thomasvincent/2bb5cf173756df8b7d8d784e2382560a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Author: Thomas Vincent | |
Date: June 12, 2024 | |
Gist: https://gist.github.com/thomasvincent/2bb5cf173756df8b7d8d784e2382560a | |
Description: This script moves files from Amazon S3 to an SFTP server, triggered by S3 events or by retrying failed messages from an SQS queue. | |
""" | |
import argparse | |
import json | |
import logging | |
import os | |
import tempfile | |
from typing import Tuple | |
import boto3 | |
import pysftp | |
from botocore.exceptions import ClientError | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Get pysftp module from vendored directory | |
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "vendored")) | |
# Load environment variables or set defaults for local testing | |
SFTP_HOST = os.environ.get('SFTP_HOST', 'your_sftp_host') | |
SFTP_PORT = int(os.environ.get('SFTP_PORT', 22)) | |
SFTP_USER = os.environ.get('SFTP_USER', 'your_sftp_user') | |
SFTP_PRIVATE_KEY_PATH = os.environ.get('SFTP_PRIVATE_KEY_PATH', '/path/to/your/private/key.pem') | |
SFTP_REMOTE_DIRECTORY = os.environ.get('SFTP_REMOTE_DIRECTORY', '/') | |
SQS_QUEUE_NAME = os.environ.get('SQS_QUEUE_NAME', 'your_sqs_queue_name') | |
def lambda_handler(event: dict, context): | |
"""Lambda function entry point for handling S3 events or SQS messages.""" | |
if 'Records' in event: | |
for record in event['Records']: | |
if record['eventSource'] == "aws:s3": | |
s3_bucket = record['s3']['bucket']['name'] | |
s3_key = record['s3']['object']['key'] | |
process_s3_object(s3_bucket, s3_key) | |
else: | |
retry_failed_messages() | |
def process_s3_object(s3_bucket: str, s3_key: str): | |
"""Orchestrates the download, upload, and error handling for an S3 object.""" | |
try: | |
with tempfile.TemporaryDirectory() as tmpdir: | |
local_path = download_s3_object(s3_bucket, s3_key, tmpdir) | |
upload_file_to_sftp(local_path, s3_key) | |
except (ClientError, OSError, pysftp.ConnectionException, pysftp.CredentialException, | |
pysftp.SSHException, pysftp.AuthenticationException) as e: | |
logging.exception(f"Error processing {s3_key}: {e}") | |
raise # Re-raise for Lambda error handling and potential retry | |
def download_s3_object(s3_bucket: str, s3_key: str, tmpdir: str) -> str: | |
"""Downloads an S3 object to a local temporary file.""" | |
s3 = boto3.client('s3') | |
local_path = os.path.join(tmpdir, s3_key) | |
s3.download_file(s3_bucket, s3_key, local_path) | |
logging.info(f'Downloaded {s3_key} from {s3_bucket} to {local_path}') | |
return local_path | |
def upload_file_to_sftp(local_path: str, remote_path: str): | |
"""Uploads a file to the SFTP server.""" | |
with pysftp.Connection(SFTP_HOST, port=SFTP_PORT, username=SFTP_USER, private_key=SFTP_PRIVATE_KEY_PATH) as sftp: | |
remote_dir = os.path.dirname(remote_path) | |
sftp.makedirs(remote_dir) # Ensure remote directory exists | |
with sftp.cd(SFTP_REMOTE_DIRECTORY): | |
sftp.put(local_path, remote_path) | |
logging.info(f'Uploaded {local_path} to {remote_path} on SFTP server') | |
def retry_failed_messages(): | |
"""Retrieves and re-processes failed messages from an SQS queue.""" | |
sqs = boto3.client('sqs') | |
queue_url = sqs.get_queue_url(QueueName=SQS_QUEUE_NAME)['QueueUrl'] | |
while True: | |
response = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10) | |
if 'Messages' not in response: | |
break | |
for message in response['Messages']: | |
try: | |
event = json.loads(message['Body']) | |
lambda_handler(event, None) # Simulate Lambda invocation | |
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle']) | |
except Exception as e: | |
logging.error(f"Error processing SQS message: {e}") | |
def split_s3_path(s3_path: str) -> Tuple[str, str]: | |
"""Splits an S3 path into bucket and key.""" | |
parts = s3_path.split('/', 1) | |
return parts[0], parts[1] | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Move a file from S3 to an SFTP server') | |
parser.add_argument('s3_path', help='The full path to the S3 object (e.g., my_bucket/path/to/key)') | |
args = parser.parse_args() | |
s3_bucket, s3_key = split_s3_path(args.s3_path) | |
process_s3_object(s3_bucket, s3_key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment