Skip to content

Instantly share code, notes, and snippets.

@thomasvincent
Created June 12, 2024 22:11
Show Gist options
  • Save thomasvincent/2bb5cf173756df8b7d8d784e2382560a to your computer and use it in GitHub Desktop.
Save thomasvincent/2bb5cf173756df8b7d8d784e2382560a to your computer and use it in GitHub Desktop.
"""
Author: Thomas Vincent
Date: June 12, 2024
Gist: https://gist.github.com/thomasvincent/2bb5cf173756df8b7d8d784e2382560a
Description: This script moves files from Amazon S3 to an SFTP server, triggered by S3 events or by retrying failed messages from an SQS queue.
"""
import argparse
import json
import logging
import os
import tempfile
from typing import Tuple
import boto3
import pysftp
from botocore.exceptions import ClientError
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Get pysftp module from vendored directory
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "vendored"))
# Load environment variables or set defaults for local testing
SFTP_HOST = os.environ.get('SFTP_HOST', 'your_sftp_host')
SFTP_PORT = int(os.environ.get('SFTP_PORT', 22))
SFTP_USER = os.environ.get('SFTP_USER', 'your_sftp_user')
SFTP_PRIVATE_KEY_PATH = os.environ.get('SFTP_PRIVATE_KEY_PATH', '/path/to/your/private/key.pem')
SFTP_REMOTE_DIRECTORY = os.environ.get('SFTP_REMOTE_DIRECTORY', '/')
SQS_QUEUE_NAME = os.environ.get('SQS_QUEUE_NAME', 'your_sqs_queue_name')
def lambda_handler(event: dict, context):
"""Lambda function entry point for handling S3 events or SQS messages."""
if 'Records' in event:
for record in event['Records']:
if record['eventSource'] == "aws:s3":
s3_bucket = record['s3']['bucket']['name']
s3_key = record['s3']['object']['key']
process_s3_object(s3_bucket, s3_key)
else:
retry_failed_messages()
def process_s3_object(s3_bucket: str, s3_key: str):
"""Orchestrates the download, upload, and error handling for an S3 object."""
try:
with tempfile.TemporaryDirectory() as tmpdir:
local_path = download_s3_object(s3_bucket, s3_key, tmpdir)
upload_file_to_sftp(local_path, s3_key)
except (ClientError, OSError, pysftp.ConnectionException, pysftp.CredentialException,
pysftp.SSHException, pysftp.AuthenticationException) as e:
logging.exception(f"Error processing {s3_key}: {e}")
raise # Re-raise for Lambda error handling and potential retry
def download_s3_object(s3_bucket: str, s3_key: str, tmpdir: str) -> str:
"""Downloads an S3 object to a local temporary file."""
s3 = boto3.client('s3')
local_path = os.path.join(tmpdir, s3_key)
s3.download_file(s3_bucket, s3_key, local_path)
logging.info(f'Downloaded {s3_key} from {s3_bucket} to {local_path}')
return local_path
def upload_file_to_sftp(local_path: str, remote_path: str):
"""Uploads a file to the SFTP server."""
with pysftp.Connection(SFTP_HOST, port=SFTP_PORT, username=SFTP_USER, private_key=SFTP_PRIVATE_KEY_PATH) as sftp:
remote_dir = os.path.dirname(remote_path)
sftp.makedirs(remote_dir) # Ensure remote directory exists
with sftp.cd(SFTP_REMOTE_DIRECTORY):
sftp.put(local_path, remote_path)
logging.info(f'Uploaded {local_path} to {remote_path} on SFTP server')
def retry_failed_messages():
"""Retrieves and re-processes failed messages from an SQS queue."""
sqs = boto3.client('sqs')
queue_url = sqs.get_queue_url(QueueName=SQS_QUEUE_NAME)['QueueUrl']
while True:
response = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10)
if 'Messages' not in response:
break
for message in response['Messages']:
try:
event = json.loads(message['Body'])
lambda_handler(event, None) # Simulate Lambda invocation
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'])
except Exception as e:
logging.error(f"Error processing SQS message: {e}")
def split_s3_path(s3_path: str) -> Tuple[str, str]:
"""Splits an S3 path into bucket and key."""
parts = s3_path.split('/', 1)
return parts[0], parts[1]
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Move a file from S3 to an SFTP server')
parser.add_argument('s3_path', help='The full path to the S3 object (e.g., my_bucket/path/to/key)')
args = parser.parse_args()
s3_bucket, s3_key = split_s3_path(args.s3_path)
process_s3_object(s3_bucket, s3_key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment