Skip to content

Instantly share code, notes, and snippets.

@subak
Created November 20, 2015 12:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save subak/16294f4b40050ba14ff1 to your computer and use it in GitHub Desktop.
Save subak/16294f4b40050ba14ff1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
#
# unox
#
# Author: Hannes Landeholm <hannes@jumpstarter.io>
#
# The Unison beta (2.48) comes with file system change monitoring (repeat = watch)
# through an abstract "unison-fsmonitor" adapter that integrates with each respective
# OS file update watch interface. This allows responsive dropbox like master-master sync
# of files over SSH. The Unison beta comes with an adapter for Windows and Linux but
# unfortunately lacks one for OS X.
#
# This script implements the Unison fswatch protocol (see /src/fswatch.ml)
# and is intended to be installed as unison-fsmonitor in the PATH in OS X. This is the
# missing puzzle piece for repeat = watch support for Unison in in OS X.
#
# Dependencies: pip install macfsevents
#
# Licence: MPLv2 (https://www.mozilla.org/MPL/2.0/)
import sys
import os
import time
import fsevents
import urllib.parse
import traceback
my_log_prefix = "[unox]"
_in_debug = "--debug" in sys.argv
_in_debug_plus = False
# Global MacFSEvents observer.
observer = fsevents.Observer()
observer.start()
# Dict of monitored replicas.
# Replica hash mapped to fsevents.Stream objects.
replicas = {}
# Dict of pending replicas that are beeing waited on.
# Replica hash mapped to True if replica is pending.
pending_reps = {}
# Dict of triggered replicas.
# Replica hash mapped to recursive dict where keys are path tokens or True for pending leaf.
triggered_reps = {}
def format_exception(e):
# Thanks for not bundling this function in the Python library Guido. *facepalm*
exception_list = traceback.format_stack()
exception_list = exception_list[:-2]
exception_list.extend(traceback.format_tb(sys.exc_info()[2]))
exception_list.extend(traceback.format_exception_only(sys.exc_info()[0], sys.exc_info()[1]))
exception_str = "Traceback (most recent call last):\n"
exception_str += "".join(exception_list)
exception_str = exception_str[:-1]
return exception_str
def _debug_triggers():
global pending_reps, triggered_reps
if not _in_debug_plus:
return
wait_info = ""
if len(pending_reps) > 0:
wait_info = " | wait=" + str(pending_reps)
sys.stderr.write(my_log_prefix + "[DEBUG+]: trig=" + str(triggered_reps) + wait_info + "\n")
def _debug(msg):
sys.stderr.write(my_log_prefix + "[DEBUG]: " + msg.strip() + "\n")
def warn(msg):
sys.stderr.write(my_log_prefix + "[WARN]: " + msg.strip() + "\n")
def sendCmd(cmd, args):
raw_cmd = cmd
for arg in args:
raw_cmd += " " + urllib.parse.quote(arg);
if _in_debug: _debug("sendCmd: " + raw_cmd)
sys.stdout.write(raw_cmd + "\n")
# Safely injects a command to send from non-receive context.
def injectCmd(cmd, args):
sendCmd(cmd, args)
sys.stdout.flush()
def sendAck():
sendCmd("OK", [])
def sendError(msg):
sendCmd("ERROR", [msg])
os._exit(1)
def recvCmd():
# We flush before stalling on read instead of
# flushing every write for optimization purposes.
sys.stdout.flush()
line = sys.stdin.readline()
if not line.endswith("\n"):
# End of stream means we're done.
if _in_debug: _debug("stdin closed, exiting")
sys.exit(0)
if _in_debug: _debug("recvCmd: " + line)
# Parse cmd and args. Args are url encoded.
words = line.strip().split(" ")
args = []
for word in words[1:]:
args.append(urllib.parse.unquote(word))
return [words[0], args]
def pathTokenize(path):
path_toks = []
for path_tok in path.split("/"):
if len(path_tok) > 0:
path_toks.append(path_tok)
return path_toks
def triggerReplica(replica, local_path_toks):
global pending_reps, triggered_reps
if replica in pending_reps:
# Got event for pending replica, notify and reset wait.
injectCmd("CHANGES", [replica])
pending_reps = {}
# Handle root.
if len(local_path_toks) == 0:
triggered_reps[replica] = True
return
elif not replica in triggered_reps:
cur_lvl = {}
triggered_reps[replica] = cur_lvl
else:
cur_lvl = triggered_reps[replica]
# Iterate through branches.
for branch_path_tok in local_path_toks[:len(local_path_toks) - 1]:
if cur_lvl == True:
return
if not branch_path_tok in cur_lvl:
new_lvl = {}
cur_lvl[branch_path_tok] = new_lvl
else:
new_lvl = cur_lvl[branch_path_tok]
cur_lvl = new_lvl
# Handle leaf.
if cur_lvl == True:
return
leaf_path_tok = local_path_toks[len(local_path_toks) - 1]
cur_lvl[leaf_path_tok] = True
_debug_triggers()
# Starts monitoring of a replica.
def startReplicaMon(replica, fspath, path):
global replicas, observer
if not replica in replicas:
# Ensure fspath has trailing slash.
fspath = os.path.join(fspath, "")
if _in_debug: _debug("start monitoring of replica [" + replica + "] [" + fspath + "]")
def replicaFileEventCallback(path, mask):
try:
if not path.startswith(fspath):
return warn("unexpected file event at path [" + path + "] for [" + fspath + "]")
local_path = path[len(fspath):]
local_path_toks = pathTokenize(local_path)
if _in_debug: _debug("replica:[" + replica + "] file event @[" + local_path + "] (" + path + ")")
triggerReplica(replica, local_path_toks)
except Exception as e:
# Because python is a horrible language it has a special behavior for non-main threads that
# fails to catch an exception. Instead of crashing the process, only the thread is destroyed.
# We fix this with this catch all exception handler.
sys.stderr.write(format_exception(e))
sys.stderr.flush()
os._exit(1)
try:
# OS X has no interface for "file level" events. You would have to implement this manually in userspace,
# and compare against a snapshot. This means there's no point in us doing it, better leave it to Unison.
if _in_debug: _debug("replica:[" + replica + "] watching path [" + fspath + "]")
stream = fsevents.Stream(replicaFileEventCallback, fspath)
observer.schedule(stream)
except (FileNotFoundError, NotADirectoryError) as e:
sendError(str(e))
replicas[replica] = {
"stream": stream,
"fspath": fspath
}
sendAck()
while True:
[cmd, args] = recvCmd();
if cmd == "DIR":
sendAck()
elif cmd == "LINK":
sendError("link following is not supported by unison-watchdog, please disable this option (-links)")
elif cmd == "DONE":
return
else:
sendError("unexpected cmd in replica start: " + cmd)
def reportRecursiveChanges(local_path, cur_lvl):
if (cur_lvl == True):
sendCmd("RECURSIVE", [local_path])
return
for path_tok, new_lvl in cur_lvl.items():
reportRecursiveChanges(os.path.join(local_path, path_tok), new_lvl);
def main():
global replicas, pending_reps, triggered_reps
# Version handshake.
sendCmd("VERSION", ["1"])
[cmd, args] = recvCmd();
if cmd != "VERSION":
sendError("unexpected version cmd: " + cmd)
[v_no] = args
if v_no != "1":
warn("unexpected version: " + v_no)
# Start watch operation.
_debug_triggers()
while True:
[cmd, args] = recvCmd();
# Cancel pending waits when any other command is received.
if cmd != "WAIT":
pending_reps = {}
# Check command.
if cmd == "DEBUG":
_in_debug = True
elif cmd == "START":
# Start observing replica.
if len(args) >= 3:
[replica, fspath, path] = args
else:
# No path, only monitoring fspath.
[replica, fspath] = args
path = ""
startReplicaMon(replica, fspath, path)
elif cmd == "WAIT":
# Start waiting for another replica.
[replica] = args
if not replica in replicas:
sendError("unknown replica: " + replica)
if replica in triggered_reps:
# Is pre-triggered replica.
sendCmd("CHANGES", replica)
pending_reps = {}
else:
pending_reps[replica] = True
_debug_triggers()
elif cmd == "CHANGES":
# Get pending replicas.
[replica] = args
if not replica in replicas:
sendError("unknown replica: " + replica)
if replica in triggered_reps:
reportRecursiveChanges("", triggered_reps[replica])
del triggered_reps[replica]
sendCmd("DONE", [])
_debug_triggers()
elif cmd == "RESET":
# Stop observing replica.
[replica] = args
if not replica in replicas:
warn("unknown replica: " + replica)
continue
stream = replicas[replica]["stream"]
if stream is not None:
observer.unschedule(stream)
del replicas[replica]
if replica in triggered_reps:
del triggered_reps[replica]
_debug_triggers()
else:
sendError("unexpected root cmd: " + cmd)
if __name__ == '__main__':
try:
main()
finally:
for replica in replicas:
observer.unschedule(replicas[replica]["stream"])
observer.stop()
observer.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment