Skip to content

Instantly share code, notes, and snippets.

@LemonPi
Last active November 3, 2018 20:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LemonPi/49e749489a46186665ea4a5e5e0819c1 to your computer and use it in GitHub Desktop.
Save LemonPi/49e749489a46186665ea4a5e5e0819c1 to your computer and use it in GitHub Desktop.
Automatic rsync on file change (alternative for systems without inotifywait)
import sys
import time
import logging
from watchdog.observers import Observer
from watchdog.events import RegexMatchingEventHandler
from subprocess import call
import argparse
class RsyncHandler(RegexMatchingEventHandler):
def __init__(self, path, target):
# ignore hidden files
super(RsyncHandler, self).__init__(ignore_regexes=['^[.]{1}.*', '.*/[.]{1}.*'])
logging.info("rsyncing %s to %s", path, target)
self.path = path
self.target = target
self.debounce_threshold = 1 # require at least 1 second between syncs
self.last_sync = None
def sync(self):
if self.last_sync is None or time.time() - self.last_sync > self.debounce_threshold:
logging.info("sync")
call(["rsync", "-avz", "-e", "ssh", self.path, self.target])
self.last_sync = time.time()
def on_any_event(self, event):
self.sync()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
parser = argparse.ArgumentParser(description='Sync files to balancebot')
parser.add_argument('path', default='.', help='local path to sync')
parser.add_argument('-r', '--reverse', action='store_true', help='Whether to reverse the direction of syncing')
parser.add_argument('-w', '--wifi', action='store_true', help='Whether to connect via wifi')
args = parser.parse_args()
ip = '3.105' if args.wifi else '7.2'
path = args.path
target = "debian@192.168.{}:~/{}".format(ip, path)
if args.reverse:
target, path = path, target
event_handler = RsyncHandler(path, target)
# initial sync
event_handler.sync()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment