Skip to content

Instantly share code, notes, and snippets.

@truevis
Created June 18, 2025 08:53
Show Gist options
  • Save truevis/970cb8408c01085fb87a0e181982b38e to your computer and use it in GitHub Desktop.
Save truevis/970cb8408c01085fb87a0e181982b38e to your computer and use it in GitHub Desktop.
Upload new changed files to FTP
import os
import ftplib
import json
from datetime import datetime, date
import fnmatch
# Import FTP credentials from configuration file
from ftp_config import FTP_HOST, FTP_USER, FTP_PASS
# --- Local Configuration ---
# The script will upload files from the directory it is run in.
# It will maintain the directory structure on the remote server.
STATE_FILE = "upload_state.json"
REMOTE_BASE_DIR = "/" # e.g., "/public_html/" or "/" for root
# --- Exclusions ---
# Files and directories to exclude from the upload.
# These are combined with patterns from .gitignore
EXCLUDE_FILES = [os.path.basename(__file__), STATE_FILE, 'package-lock.json', '.gitignore']
EXCLUDE_DIRS = ['.git', 'node_modules']
def load_gitignore_patterns(gitignore_path=".gitignore"):
"""
Loads and returns patterns from a .gitignore file.
Handles comments and empty lines.
"""
patterns = []
if os.path.exists(gitignore_path):
with open(gitignore_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
patterns.append(line)
return patterns
def is_path_ignored(path, ignored_patterns):
"""
Checks if a given path matches any of the gitignore patterns.
This is a simplified implementation that handles common cases.
"""
path = path.replace('\\', '/')
if path.startswith('./'):
path = path[2:]
for pattern in ignored_patterns:
# Match against the full path
if fnmatch.fnmatch(path, pattern):
return True
# Match just the filename/directory name part
if fnmatch.fnmatch(os.path.basename(path), pattern):
return True
# Handle directory patterns like 'logs/' which should match 'logs' and everything inside
if pattern.endswith('/') and (path == pattern.rstrip('/') or path.startswith(pattern)):
return True
return False
def get_last_uploaded_times():
"""Loads file modification times from the state file."""
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
try:
return json.load(f)
except json.JSONDecodeError:
return {}
return {}
def save_last_uploaded_times(uploaded_files):
"""Saves file modification times to the state file."""
with open(STATE_FILE, 'w') as f:
json.dump(uploaded_files, f, indent=4)
def ftp_connect():
"""Establishes and returns an FTP connection."""
try:
ftp = ftplib.FTP(FTP_HOST)
ftp.login(FTP_USER, FTP_PASS)
if REMOTE_BASE_DIR and REMOTE_BASE_DIR != '/':
try:
ftp.cwd(REMOTE_BASE_DIR)
except ftplib.error_perm:
print(f"Warning: Remote base directory '{REMOTE_BASE_DIR}' does not exist. Attempting to create it.")
try:
ftp.mkd(REMOTE_BASE_DIR)
ftp.cwd(REMOTE_BASE_DIR)
except ftplib.error_perm as e:
print(f"Error: Could not create or access remote base directory '{REMOTE_BASE_DIR}'. Aborting. Error: {e}")
ftp.quit()
return None
return ftp
except ftplib.all_errors as e:
print(f"FTP connection failed: {e}")
return None
def ensure_remote_dir(ftp, remote_path):
"""Ensures the remote directory structure exists, creating it if necessary."""
remote_dir = os.path.dirname(remote_path).replace('\\', '/')
if remote_dir:
# `cwd` back to the base directory before creating the new path
if REMOTE_BASE_DIR and REMOTE_BASE_DIR != '/':
ftp.cwd(REMOTE_BASE_DIR)
else:
ftp.cwd('/') # Go to root
path_parts = remote_dir.split('/')
for part in path_parts:
if part:
try:
ftp.cwd(part)
except ftplib.error_perm:
try:
ftp.mkd(part)
ftp.cwd(part)
except ftplib.error_perm as e:
print(f"Error: Could not create or access remote directory '{part}'. Error: {e}")
return False
return True
def upload_file(ftp, local_path, remote_path):
"""Uploads a single file."""
try:
# Ensure the target directory exists on the remote server
if not ensure_remote_dir(ftp, remote_path):
print(f"Skipping upload for {local_path} due to directory creation failure.")
return False
# Go back to the base directory to perform the upload
if REMOTE_BASE_DIR and REMOTE_BASE_DIR != '/':
ftp.cwd(REMOTE_BASE_DIR)
else:
ftp.cwd('/') # Go to root
with open(local_path, 'rb') as f:
ftp.storbinary(f'STOR {remote_path}', f)
print(f"Successfully uploaded: {local_path} -> {remote_path}")
return True
except ftplib.all_errors as e:
print(f"Error uploading {local_path}: {e}")
return False
except FileNotFoundError:
print(f"Error: Local file not found: {local_path}")
return False
def get_files_to_upload():
"""Scans local directory and returns a list of new or modified files from today, respecting .gitignore."""
last_uploaded = get_last_uploaded_times()
files_to_upload = []
current_mtimes = {}
today = date.today()
# Load all ignore patterns from .gitignore and static config
gitignore_patterns = load_gitignore_patterns()
static_exclusions = EXCLUDE_FILES + [d + '/' for d in EXCLUDE_DIRS]
all_ignored_patterns = gitignore_patterns + static_exclusions
for root, dirs, files in os.walk(".", topdown=True):
# First, filter directories based on ignore patterns
# We need to check the full path of the directory
original_dirs = list(dirs)
dirs[:] = [] # Clear the list and re-populate
for d in original_dirs:
dir_path = os.path.join(root, d)
if not is_path_ignored(dir_path, all_ignored_patterns):
dirs.append(d)
# Then, check each file
for filename in files:
local_path = os.path.join(root, filename)
if is_path_ignored(local_path, all_ignored_patterns):
continue
# Normalize path for state file consistency
local_path_norm = local_path.replace('\\', '/')
if local_path_norm.startswith('./'):
local_path_norm = local_path_norm[2:]
try:
mod_time = os.path.getmtime(local_path)
mod_date = datetime.fromtimestamp(mod_time).date()
current_mtimes[local_path_norm] = mod_time
# Only consider files modified today
if mod_date == today:
# And check if they are new or updated since last upload
if local_path_norm not in last_uploaded or last_uploaded[local_path_norm] < mod_time:
files_to_upload.append(local_path_norm)
except FileNotFoundError:
print(f"Warning: File disappeared during scan: {local_path_norm}")
return files_to_upload, current_mtimes
def main():
"""Main function to orchestrate the FTP upload process."""
files_to_upload, current_mtimes = get_files_to_upload()
if not files_to_upload:
print("No new or modified files to upload. Everything is up to date.")
return
print(f"Found {len(files_to_upload)} files to upload:")
for f in files_to_upload:
print(f" - {f}")
try:
confirm = input("\nDo you want to upload these files? [Y/n] ").strip().lower()
if confirm == 'n':
print("Upload cancelled by user.")
return
except (KeyboardInterrupt, EOFError):
print("\nUpload cancelled.")
return
ftp = ftp_connect()
if not ftp:
return
successful_uploads = {}
for local_path in files_to_upload:
remote_path = local_path # remote path mirrors local path
if upload_file(ftp, local_path, remote_path):
successful_uploads[local_path] = current_mtimes[local_path]
ftp.quit()
if successful_uploads:
print("\nUpdating upload state...")
# Update the state file with all current modification times,
# but only for files that were successfully uploaded in this run.
# And preserve old state for files that were not part of this run.
new_state = get_last_uploaded_times()
new_state.update(successful_uploads)
save_last_uploaded_times(new_state)
print("Upload state updated.")
print("\nProcess finished.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment