Skip to content

Instantly share code, notes, and snippets.

@ryandhubbard
Created October 27, 2023 22:48
Show Gist options
  • Save ryandhubbard/4380eb2b03a94f38d8d488c47460d78e to your computer and use it in GitHub Desktop.
Save ryandhubbard/4380eb2b03a94f38d8d488c47460d78e to your computer and use it in GitHub Desktop.
This script connects to a sftp server and copies the files to a s3 bucket.
import paramiko
import urllib.parse
import stat
import boto3
# import logging
import os
username = ""
password = ""
hostname = "sftp.domain.com/"
port = 22
# AWS S3 configuration
s3_bucket_name = "bucket"
s3 = boto3.client('s3', aws_access_key_id='', aws_secret_access_key='')
# URL encode the password
encoded_password = urllib.parse.quote(password)
# Construct the SFTP URL
sftp_url = f"sftp://{username}:{encoded_password}@{hostname}:{port}"
print("SFTP URL:", sftp_url)
# Now you can parse the URL
parsed_url = urllib.parse.urlparse(sftp_url)
print("Parsed URL:", parsed_url)
username = parsed_url.username
password = urllib.parse.unquote(parsed_url.password) if parsed_url.password is not None else None
hostname = parsed_url.hostname
port = parsed_url.port or 22 # Use default SFTP port 22 if not specified
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, port, username, password)
sftp = client.open_sftp()
# logging.basicConfig(level=logging.DEBUG)
# paramiko.util.log_to_file("paramikologs.log")
def get_s3_path(base_path, filename):
if 'foo' in filename:
return os.path.join(base_path, 'conversions', filename)
elif 'boo' in filename:
return os.path.join(base_path, 'sub_stats', filename)
elif 'test' in filename:
return os.path.join(base_path, 'creative', filename)
else:
return os.path.join(base_path, filename)
def upload_to_s3(local_file, s3_path):
s3.upload_file(local_file, s3_bucket_name, s3_path)
print(f"File {local_file} uploaded to s3://{s3_bucket_name}/{s3_path}")
def list_files_recursive_and_upload(sftp, directory=".", s3_prefix=""):
try:
for item in sftp.listdir_attr(directory):
path = f"{directory}/{item.filename}" if directory != "." else item.filename
s3_path = get_s3_path(s3_prefix, item.filename)
if stat.S_ISDIR(item.st_mode):
# If it's a directory, recurse into it
print(f"Entering directory: {path}")
list_files_recursive_and_upload(sftp, path, s3_path)
else:
# Download file from SFTP server
local_file_path = os.path.join("temp", item.filename)
sftp.get(path, local_file_path)
print(f"File downloaded: {local_file_path}")
# Upload file to S3
upload_to_s3(local_file_path, s3_path)
# Delete local file
os.remove(local_file_path)
print(f"Local file deleted: {local_file_path}")
except Exception as e:
print(f"Failed to list/upload from {directory} to {s3_prefix}: {str(e)}")
# Setup and create a temporary directory to store files before uploading to S3
if not os.path.exists('temp'):
os.makedirs('temp')
# Start listing files from the root directory and uploading them to S3
list_files_recursive_and_upload(sftp, s3_prefix="folder/prefix")
# Close the SFTP session and SSH client
sftp.close()
client.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment