Skip to content

Instantly share code, notes, and snippets.

@quark-zju
Created April 17, 2019 16:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quark-zju/11d4678051cc64420bfd57b81dda5d0f to your computer and use it in GitHub Desktop.
Save quark-zju/11d4678051cc64420bfd57b81dda5d0f to your computer and use it in GitHub Desktop.
A naive incremental sync script via sftp
#!/usr/bin/env python
"""A naive incremental sync script via sftp
When do you want to use this?
- want a non-lazy local file system (not fuse/sshfs)
- sftp is configured easier to use than ssh (rsync or git)
This script is very simple and does not handle complex cases. Use rsync to:
- set up an initial mirror
- re-sync after network errors
- remove or mirror empty directories
"""
from contextlib import contextmanager
import errno
import os
import paramiko
import socket
import subprocess
import sys
import time
join = os.path.join
def connect(sshargs):
"""Connect, and get a SFTPClient"""
return RetryClient(sshargs)
class Channel(object):
"""Wrapper for a socket object.
SFTPClient requires "Channel" to have a "get_name" method
"""
def __init__(self, sock):
self.sock = sock
def get_name(self):
return ""
def send(self, *args, **kwargs):
return self.sock.send(*args, **kwargs)
def recv(self, *args, **kwargs):
return self.sock.recv(*args, **kwargs)
class RetryClient(object):
def __init__(self, sshargs):
self.sshargs = sshargs
self._client = None
def _reconnect(self):
s1, s2 = socket.socketpair()
with scopedeprint("Connecting %r" % self.sshargs):
p = subprocess.Popen(
["ssh"] + self.sshargs + ["-x", "-a", "-s", "sftp"],
stdin=s1.fileno(),
stdout=s1.fileno(),
)
self._client = paramiko.sftp_client.SFTPClient(Channel(s2))
@property
def client(self):
if self._client is None:
self._reconnect()
return self._client
def _retrywrapper(self, funcname):
def wrapper(*args, **kwargs):
func = getattr(self.client, funcname)
try:
return func(*args, **kwargs)
except socket.error as ex:
if ex.errno == errno.EPIPE:
# Reconnect
self._reconnect()
return wrapper(*args, **kwargs)
else:
raise
return wrapper
def mkdirp(self, path):
mkdir = self._retrywrapper("mkdir")
try:
return mkdir(path)
except IOError as ex:
parent = os.path.dirname(path)
if ex.errno == errno.ENOENT and path != parent:
self.mkdirp(parent)
return mkdir(path)
raise
def put(self, localpath, remotepath, confirm=False):
put = self._retrywrapper("put")
try:
return put(localpath=localpath, remotepath=remotepath, confirm=confirm)
except IOError as ex:
if ex.errno == errno.ENOENT and "/" in remotepath:
self.mkdirp(os.path.dirname(remotepath))
return put(localpath=localpath, remotepath=remotepath, confirm=confirm)
raise
def __getattr__(self, name):
return self._retrywrapper(name)
def eprint(msg):
sys.stderr.write("%s\n" % msg)
_scopelevel = [0]
@contextmanager
def scopedeprint(msg):
if _scopelevel[0] > 0:
sys.stderr.write(" %s ..." % msg)
yield
return
sys.stderr.write("%s ..." % msg)
sys.stderr.flush()
_scopelevel[0] += 1
try:
yield
except Exception as ex:
_scopelevel[0] -= 1
sys.stderr.write(" %s\n" % ex)
sys.stderr.flush()
raise
else:
_scopelevel[0] -= 1
sys.stderr.write(" done\n")
sys.stderr.flush()
def find(path):
for root, subdirs, files in os.walk(path):
for name in files:
yield join(root, name)
def stat(path):
try:
return os.lstat(path).st_mtime
except OSError:
return 0
def main(args):
if len(args) < 2 or any(s in args for s in ("--help", "-h")):
eprint("Usage: $0 SSH_ARGS REMOTE_PATH LOCAL_PATH\n")
eprint("Mirror local file changes to remote via the sftp protocol.")
eprint("This tool does not do the initial mirroring. Use 'rsync' for that.")
return 1
localpath = args[-1]
remotepath = args[-2]
sshargs = args[:-2]
client = connect(sshargs)
with scopedeprint("Scanning %s" % localpath):
state = {p: stat(p) for p in find(localpath)}
eprint("Waiting for changes on %s" % localpath)
while True:
newstate = {p: stat(p) for p in find(localpath)}
for path, mtime in newstate.items():
if mtime != state.get(path):
assert path.startswith(localpath + "/")
remote = join(remotepath, path[len(localpath) + 1 :])
with scopedeprint("Uploading %s" % path):
client.put(localpath=path, remotepath=remote, confirm=False)
for path in set(state.keys()) - set(newstate.keys()):
assert path.startswith(localpath + "/")
remote = join(remotepath, path[len(localpath) + 1 :])
with scopedeprint("Removing %s" % path):
client.unlink(remote)
state = newstate
time.sleep(0.5)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]) or 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment