Skip to content

Instantly share code, notes, and snippets.

@ryaminal
Created April 17, 2024 03:16
Show Gist options
  • Save ryaminal/38b31ec03eb994f9a0adfd87848eb4ad to your computer and use it in GitHub Desktop.
Save ryaminal/38b31ec03eb994f9a0adfd87848eb4ad to your computer and use it in GitHub Desktop.
Non generic rsync for fsspec
# Heavily inspired from fsspec's GenericFileSystem implementation of rsync
import logging
from typing import Any, Callable
from fsspec.implementations.sftp import AbstractFileSystem
from gcsfs import GCSFileSystem
from rich.logging import RichHandler
from sshfs import SSHFileSystem
LOG_FORMAT = "%(message)s"
LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
logging.basicConfig(level="INFO", format=LOG_FORMAT, handlers=[RichHandler()])
log = logging.getLogger(__name__)
def copy_file_fs_to_fs_no_temp(
source_fs: AbstractFileSystem, source_path: str, dest_fs: AbstractFileSystem, dest_path: str
):
log.info(f"Copying {source_path} -> {dest_path}")
read_blocksize = 2**20
kw = {"blocksize": 0, "cache_type": "none"}
with source_fs.open(source_path, "rb", **kw) as src:
with dest_fs.open(dest_path, "wb", **kw) as dest:
while True:
data = src.read(read_blocksize)
if not data:
break
dest.write(data)
log.info(f"Copied {source_path} -> {dest_path}")
# TODO: async this ish
def copy_sftp_to_gcs(
source_url: str,
dest_url: str,
delete_missing: bool = False,
source_diff_field: str | Callable[[dict], Any] = "size",
dest_diff_field: str | Callable[[dict], Any] = "size",
update_cond: str = "different",
):
"""
Copies new and changed files from SFTP server (including nested directories) to Google Cloud Storage.
Args:
sftp_url (str): URL of the SFTP server (e.g. sftp://user@host:port)
gcs_url (str): Name of the Google Cloud Storage bucket (e.g. gs://my-bucket/my-dir/)
"""
# Initialize file systems
_kwargs = SSHFileSystem._get_kwargs_from_urls(source_url)
source_fs = SSHFileSystem(**_kwargs)
dest_fs = GCSFileSystem()
# strip protocols from url leaving only paths e.g. sftp://user@host:port/dir1/dir2/stuff -> /dir1/dir2/stuff
source_path = source_fs._strip_protocol(source_url)
dest_path = dest_fs._strip_protocol(dest_url)
# Find all files in SFTP source directory with details
if not source_fs.isdir(source_path):
raise ValueError(f"Path {source_path} is not a directory")
# get all files and dirs for source and dest
source_files_and_dirs: dict[str, dict] = source_fs.find(source_path, detail=True, withdirs=True)
dest_files_and_dirs: dict[str, dict] = dest_fs.find(dest_path, detail=True, withdirs=True)
# Create directories that don't already exist in the destination
source_dirs_to_create = [
_dir
for _dir, details in source_files_and_dirs.items()
if details["type"] == "directory" and _dir.replace(source_path, dest_path) not in dest_files_and_dirs
]
for _dir in source_dirs_to_create:
# with the current dest of gcs, this is a noop. bucket must exist
dest_fs.makedirs(_dir, exist_ok=True)
source_files = {_path: details for _path, details in source_files_and_dirs.items() if details["type"] == "file"}
for _path, details in source_files.copy().items(): # copy so we can modify the original
dest_file = _path.replace(source_path, dest_path)
if dest_file in dest_files_and_dirs:
match update_cond:
case "different":
src_field = (
source_diff_field(details) if callable(source_diff_field) else details[source_diff_field]
)
dest_details = dest_files_and_dirs[dest_file]
dest_field = (
dest_diff_field(dest_details) if callable(dest_diff_field) else dest_details[dest_diff_field]
)
if src_field != dest_field:
source_files[_path] = dest_file
else:
source_files.pop(_path)
case "always":
source_files[_path] = dest_file
case _:
# noop, maybe debug log?
pass
else:
source_files[_path] = dest_file
if source_files:
log.debug(f"Copying {len(source_files)} files from {source_url} to {dest_url}")
for src, dst in source_files.items():
copy_file_fs_to_fs_no_temp(source_fs, src, dest_fs, dst)
if delete_missing:
# TODO: handle deleting files in destination?
raise NotImplementedError("delete_missing not yet implemented")
if __name__ == "__main__":
source_url = "sftp://username@host:12345/stuff"
dest_url = "gs://bucket/dir1/dir2"
copy_sftp_to_gcs(source_url, dest_url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment