Skip to content

Instantly share code, notes, and snippets.

@wrouesnel
Created August 30, 2024 06:21
Show Gist options
  • Save wrouesnel/402c295a4e2ea2316af89719cd831e60 to your computer and use it in GitHub Desktop.
Save wrouesnel/402c295a4e2ea2316af89719cd831e60 to your computer and use it in GitHub Desktop.
Structured Syncthing share management
# This is the list of leaf folders which exist in the entire sync infrastructure.
# This file is processed as a list of command line arguments.
#
# Support syntax:
# Template Controls - prevent templated lines from existing
# --not-if : Don't template if device condition matches
# --only-if : Template only if device condition matches
# Share Controls - restrict hosts the folder is offered to (and whether they elect to receive it)
# --not-shared-if: Don't offer this template result to a device if it matches
# --only-shared-if: Only offer this template result to a device if it matches
# Sync Controls
# --sync-mode: Configure directional sync mode. Mode can be either:
# --<sendonly|receiveonly>:<+host|<hostname>>
# This is the default expected directory for the script - which looks for these config files under `$HOME/config/shared`
# When setting up a new system, this is the folder you copy to your home directory.
config
# Sharing the main Documents folder, and excluding it from devices marked "android" or with the option "lightweight".
Documents --not-shared-if=platform:android --not-shared-if=options:lightweight
# Send the Music folder everywhere.
Music
# Managing keepass folders - this template syntax means for each host in hostctl, a keepass/<host> directory and folder share
# is created (except where exclusions apply - i.e. for android). The KeePass's have trigger scripts to synchronize against keepass/master
# but never open it directly. All devices end up having a copy of every other device's local keepass plus the synchronization master,
# which provides robustness against conflicts/corruption.
# Note: --sync-mode=sendonly:+host marks that only the directory with the same name as the host *on that host* should be setup as
# "send only" - all other hosts see the directory as receive only.
keepass/master
keepass/{{host}} --not-if=platform:android --not-if=options:introducer --sync-mode=sendonly:+host
# A method to use git in a decentralized way - setup all your bare git repos under ~/git, and the write-only nature of most operations
# means synchronization will just work - at least for unbusy repositories (there's only one of you).
git --not-shared-if=platform:android --not-shared-if=options:lightweight
# hostctl file is the list of expected hosts in the syncthing infrastructure.
# The format is <name> <platform> <device_id> [options]
# [options] is comma-separated and must not have spaces, but can be any string and used in folderctl selectors.
# The special option name "introducer" marks a host as an introducer.
# Other options hve no meaning are treated as tags - i.e. we use lightweight to exclude some shares from going to Windows in the
# example folderctl.
#
# --path-map allows remapping of folders for the purposes of underlying storage paths
# --sync-receive-default allows marking a host as a default receive-only system (unless explicitly overriden in folderctl)
#
# If you're adding a new host, you should edit this file and add it as a --sync-receive-default host first till you're setup,
# then remove --sync-receive-default and run update-shares to make it sendreceive.
# A basic linux desktop
linux-desktop linux <syncthing device ID here>
# A windows desktop which we tag as "lightweight" so we can send less files to it (i.e. because we dual boot)
windows-desktop windows <syncthing device ID here> lightweight
# An android phone we run syncthing-android on
android-phone android <syncthing device ID here>
# A home server which acts as our central server and is marked as an introducer. You don't *have* to do it like this,
# but its a convenient way to organize regular backups/redundancy. Note the --path-remap option since this uses the
# syncthing docker container which stores it's own config under /var/lib/syncthing/config and would clash with the `config`
# folder. The remap option means folders labelled `config` have paths generated as though they're `config_common`.
home.server.on.my.domain linux <syncthing device ID here> introducer --path-remap config config_common --sync-receive-default
#!/usr/bin/env python3
# Script implementing management of Syncthing shares via a consistent scheme.
import os
import os.path
import random
import shlex
import string
import sys
from typing import TextIO, Set, Iterable, Tuple, Optional, Dict, List, Mapping
import logging
import dataclasses
import requests
import argparse
from pathlib import Path
from lxml import etree
from dataclasses import dataclass, field
from jinja2 import Environment, BaseLoader
logger = logging.getLogger()
_LOG_FORMATS = {
"console": logging.Formatter("%(levelname)s %(lineno)d:%(funcName)s: %(message)s")
}
_LOG_TARGETS = {
"stdout": sys.stdout,
"stderr": sys.stderr,
"unknown": sys.stderr,
}
_logging_configured = False
def configure_logging(log_level: str, log_format: str, log_dest: str) -> None:
global _logging_configured
if _logging_configured is True:
return
formatter = _LOG_FORMATS.get(log_format, _LOG_FORMATS["console"])
handler = logging.StreamHandler(_LOG_TARGETS.get(log_dest, "unknown"))
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.handlers = [handler]
root_logger.setLevel(logging._nameToLevel[log_level.upper()])
root_logger.debug(f"Logging configured: {log_level} {log_format} {log_dest}")
class RelativeSession(requests.Session):
def __init__(self, base_url):
super(RelativeSession, self).__init__()
self.__base_url = base_url.strip().rstrip("/")
def request(self, method, url, **kwargs):
url = url.lstrip("/")
return super(RelativeSession, self).request(method, f"{self.__base_url}/{url}", **kwargs)
def new_syncthing_folder_id(existing_ids:Optional[Set[str]]=None) -> str:
"""Generate a random syncthing folder ID"""
rand = random.SystemRandom()
valid_chars = string.ascii_lowercase + string.digits
while True:
proposal = "-".join(("".join(rand.choices(valid_chars, k=5)), "".join(rand.choices(valid_chars, k=5))))
if proposal not in existing_ids:
break
return proposal
def read_ctl(name):
"""Read lines from a 'ctl' format file"""
lines = []
with open(name) as f:
for line in f:
line = line.strip()
if line.startswith("#"):
continue
if line == "":
continue
lines.append(line)
return lines
def normalize_device_id(device_id: str) -> str:
normed = device_id.replace("-", "").upper()
if len(normed) not in (52, 56):
raise ValueError("device_id invalid: incorrect length")
chunks = [normed[ind:ind + 7] for ind in range(0, len(normed), 7)]
return "-".join(chunks)
HOST_OPTION_INTRODUCER = "introducer"
"""Indicates the configured host should be marked as an introducer"""
@dataclass
class HostSpec:
"""Definition of the host spec format used for hostctl"""
name: str
platform: str
device_id: str
options: Set[str] = field(default_factory=lambda: set())
path_remaps: Dict[str, str] = field(default_factory=lambda: dict())
sync_direction_default: str = "sendreceive"
# @classmethod
# def from_line(cls, line):
# parts = line.split(" ")
#
# name = parts[0]
# platform = parts[1]
# device_id = normalize_device_id(parts[2])
# options = set(parts[3].split(",")) if len(parts) > 3 else set()
#
# return cls(
# name=name,
# platform=platform,
# device_id=device_id,
# options=options,
# )
#
# def to_tuples(self):
# return (self.name, self.platform, self.device_id, ",".join(sorted(self.options)))
def to_line(self):
opts = " ".join([ f"--path-remap {k} {v}" for k,v in self.path_remaps.items() ])
line = f"{self.name} {self.platform} {self.device_id} {','.join(sorted(self.options))}"
if len(opts) > 0:
line = f"{line} {opts}"
return line
@dataclass
class FolderSpec:
"""Processed folder definition"""
name: str
path: Path
allowed_hosts: List[HostSpec]
sync_direction: str
@property
def allowed_device_ids(self) -> Set[str]:
return { hs.device_id for hs in self.allowed_hosts }
hostctl_parser = argparse.ArgumentParser()
hostctl_parser.add_argument("name", type=str)
hostctl_parser.add_argument("platform", type=str, help="String identifier of the platform type")
hostctl_parser.add_argument("device_id", type=str, help="Syncthing Device ID")
hostctl_parser.add_argument("options", nargs="?", type=str, help="Comma separated string of option vales")
hostctl_parser.add_argument("--path-remap", nargs=2, action="append", type=str, help="Specify a path which should be remapped for storage purposes on this host")
hostctl_parser.add_argument("--sync-receive-default", action="store_true", help="Default all shares on this host to receive only")
def get_known_devices(hostctl_file) -> Tuple[HostSpec,...]:
"""Read a hostctl file and return host spec objects"""
hosts = []
for line in read_ctl(hostctl_file):
hostargs = hostctl_parser.parse_args(shlex.split(line))
if hostargs.options is None:
hostargs.options = ""
if hostargs.path_remap is None:
hostargs.path_remap = []
hostspec = HostSpec(
name=hostargs.name,
platform=hostargs.platform,
device_id=hostargs.device_id,
options=set(hostargs.options.split(",")),
path_remaps={ k:v for (k,v) in hostargs.path_remap },
sync_direction_default="receiveonly" if hostargs.sync_receive_default else "sendreceive",
)
hosts.append(hostspec)
return tuple(hosts)
class FolderCtlError(Exception):
"""Raised when a folderctl configuration issue is found"""
# This argparser handles processing folderctl lines.
expanding_parser = argparse.ArgumentParser()
expanding_parser.add_argument("folder_name", type=str, help="Relative path of the folder")
expanding_parser.add_argument("--not-if", action="append", help="Don't template if condition is met")
expanding_parser.add_argument("--only-if", action="append", help="Only template if condition is met")
expanding_parser.add_argument("--not-shared-if", action="append", help="Do NOT share folder if condition is met")
expanding_parser.add_argument("--only-shared-if", action="append", help="Only share folder if condition is met")
expanding_parser.add_argument("--sync-mode", type=str, action="append", help="Configure a sync mode")
hostspec_synonyms = {
"ostype": "platform"
}
@dataclass
class SyncModeSpec:
value: str
opposite: str
hostspec_fields = { f.name for f in dataclasses.fields(HostSpec) }
valid_synctypes = { "sendonly", "receiveonly", "sendreceive" }
def _build_condition_evaluator(condition):
"""Build a condition evaluator"""
key, value = condition.split(":", maxsplit=1)
# Convert the key if it's a synonym
key = hostspec_synonyms.get(key, key)
if key not in hostspec_fields:
raise KeyError(f"{key} is not a selectable field: must be one-of {list(sorted(hostspec_fields))}")
def _evaluator(spec):
spec_value = getattr(spec, key)
if isinstance(spec_value, Set):
return value in spec_value
else:
return value == spec_value
return _evaluator
def expand_folderctl(syncthing_session: requests.Session, hostctl_file: Path, folderctl_file: Path, local_deviceid: Optional[str]=None) -> List[FolderSpec]:
"""Return the effective folderctl file after templating"""
expanded = []
expanded_names = set()
home_dir = _syncthing_homedir(syncthing_session)
known_devices = get_known_devices(hostctl_file)
devicename_to_hostspec: Dict[str, HostSpec] = {hostspec.name: hostspec for hostspec in known_devices}
deviceid_to_hostspec: Dict[str, HostSpec] = {hostspec.device_id: hostspec for hostspec in known_devices}
for line in read_ctl(folderctl_file):
parse_result = expanding_parser.parse_args(shlex.split(line))
notif_evaluators = []
onlyif_evaluators = []
not_shared_if_evaluators = []
only_shared_if_evaluators = []
# Process options
if parse_result.not_if is not None:
for condition in parse_result.not_if:
notif_evaluators.append(_build_condition_evaluator(condition))
if parse_result.only_if is not None:
for condition in parse_result.only_if:
onlyif_evaluators.append(_build_condition_evaluator(condition))
if parse_result.not_shared_if is not None:
for condition in parse_result.not_shared_if:
not_shared_if_evaluators.append(_build_condition_evaluator(condition))
if parse_result.only_shared_if is not None:
for condition in parse_result.only_shared_if:
only_shared_if_evaluators.append(_build_condition_evaluator(condition))
# Parse the sync direction config
sync_config = {}
if parse_result.sync_mode is not None:
for sync_mode_spec in parse_result.sync_mode:
sync_name, hostname = sync_mode_spec.strip().split(":", maxsplit=1)
if sync_name not in valid_synctypes:
raise FolderCtlError(f"Received unknown sync type: {sync_name} (allowed: {','.format(sorted(valid_synctypes))})")
if sync_name not in sync_config:
sync_config[sync_name] = set()
sync_config[sync_name].add(hostname)
# Validate the sync config
if len(sync_config.get("sendonly", set())) > 2:
raise FolderCtlError(f"Cannot have more then 1 sendonly host for folder: {parse_result.folder_name}")
if len(sync_config.get("sendonly", set())) > 0 and len(sync_config.get("sendreceive", set())) > 0:
raise FolderCtlError(f"Cannot have a sendreceive host for folder which also has a sendonly host: {parse_result.folder_name}")
host_sync_config = {}
for sync_name, hostnames in sync_config.items():
for hostname in hostnames:
if hostname in host_sync_config:
raise FolderCtlError(f"Host appears multiple times in sync config - can only appear once: {hostname}")
host_sync_config[hostname] = sync_name
for hostspec in known_devices:
# Evaluate
if any(fn(hostspec) for fn in notif_evaluators):
continue
if not all(fn(hostspec) for fn in onlyif_evaluators):
continue
# Got through templating evaluators
rtemplate = Environment(loader=BaseLoader).from_string(parse_result.folder_name)
result = rtemplate.render(ostype=hostspec.platform,
platform=hostspec.platform,
host=hostspec.name,
options=hostspec.options,
device_id=hostspec.device_id)
if result == "":
# Skip it templating produces an empty string
continue
if result in expanded_names:
# Continue if name match already happened
continue
expanded_names.add(result)
allowed_hosts = []
for candidate_hostspec in known_devices:
if any(fn(candidate_hostspec) for fn in not_shared_if_evaluators):
continue
if not all(fn(candidate_hostspec) for fn in only_shared_if_evaluators):
continue
allowed_hosts.append(candidate_hostspec)
# If local device ID is provided, then lookup and remap the path if a remap is present - i.e.
# effective folderlabel is the folderlabel we consider for the destination path
effective_folderlabel = result
if local_deviceid is not None:
effective_folderlabel = deviceid_to_hostspec[local_deviceid].path_remaps.get(result,result)
else:
logger.warning("No local device ID supplied - no host specific path remaps applied to resolved paths")
# Process sync-direction if specified relative to *this* host
sync_direction = deviceid_to_hostspec[local_deviceid].sync_direction_default
local_hostname = deviceid_to_hostspec[local_deviceid].name
if local_hostname in host_sync_config:
# Have a specific config. Apply it.
sync_direction = host_sync_config[local_hostname]
elif "+host" in host_sync_config:
# Template config. If local host matches template host, apply it.
if local_hostname == hostspec.name:
# Match - apply the specified config.
sync_direction = host_sync_config["+host"]
else:
if host_sync_config["+host"] == "sendonly":
# If we have a sendonly +host match, everyone else is default receiveonly
sync_direction = "receiveonly"
elif host_sync_config["+host"] == "receiveonly":
# If we have a receiveonly +host match, everyone else is sendreceive by default.
pass
# Validate sync direction again
if sync_direction not in valid_synctypes:
raise FolderCtlError(f"After processing, sync_direction was invalid: {sync_direction} (should be one of: {','.join(sorted(valid_synctypes))})")
# Build a folder-spec
folderspec = FolderSpec(
name=result,
path=resolve_label_to_path(home_dir, effective_folderlabel),
allowed_hosts=allowed_hosts,
sync_direction=sync_direction,
)
expanded.append(folderspec)
return expanded
def resolve_label_to_path(home_dir: Path, label: str) -> Path:
return home_dir / label
STIGNORE_STANDARD_CONTENT = """# AUTOGENERATED FILE
#include .stglobalignore
#include .stlocalignore
#include .stsharedignore
.stlocalignore")
"""
def write_ignore_files(syncthing_session: requests.Session, hostctl_file: Path, folderctl_file: Path, stglobalignore_file: Path, dryrun=False):
def log(msg):
if dryrun:
msg = f"(dryrun) {msg}"
logger.info(msg)
expanded = expand_folderctl(syncthing_session, hostctl_file, folderctl_file)
for folder in expanded:
if not dryrun:
folder.path.mkdir(parents=True, exist_ok=True)
log(f"Created: {folder.path.as_posix()}")
# Read the stglobalignore file
stignore_data = stglobalignore_file.read_text()
for resolved_folder in expanded:
stignore_global_path = resolved_folder.path / ".stglobalignore"
stignore_shared_path = resolved_folder.path / ".stsharedignore"
stignore_local_path = resolved_folder.path / ".stlocalignore"
stignore_file_path = resolved_folder.path / ".stignore"
# Don't overwrite the actual stglobalignore source file path
if stignore_global_path != stglobalignore_file:
existed = stignore_global_path.exists()
if not dryrun:
stignore_global_path.write_text(stignore_data)
if existed:
log(f"Updated {stignore_global_path.as_posix()}")
else:
log(f"Created {stignore_global_path.as_posix()}")
if not stignore_shared_path.exists():
if not dryrun:
stignore_shared_path.touch()
log(f"Created {stignore_shared_path.as_posix()}")
if not stignore_local_path.exists():
if not dryrun:
stignore_local_path.touch()
log(f"Created {stignore_local_path.as_posix()}")
stignore_existed = stignore_file_path.exists()
if not dryrun:
stignore_file_path.write_text(STIGNORE_STANDARD_CONTENT)
if stignore_existed:
log(f"Updated {stignore_file_path.as_posix()}")
else:
log(f"Created {stignore_file_path.as_posix()}")
class SyncthingAPIError(Exception):
"""Exception raised when we're talking to syncthing"""
def _syncthing_session(syncthing_url: str, syncthing_apikey: str) -> requests.Session:
# Setup the share sync
session = RelativeSession(syncthing_url)
session.headers = {
"X-API-Key": syncthing_apikey,
}
logger.debug("Checking Syncthing health")
health_result = session.get("/rest/noauth/health")
try:
health_result.raise_for_status()
except Exception as e:
raise SyncthingAPIError("Bad response from healthcheck") from e
if health_result.json()["status"] != "OK":
raise SyncthingAPIError("Syncthing up but status endpoint reports not OK")
return session
def _syncthing_localid(syncthing_session: requests.Session) -> str:
logger.debug("Requesting current status")
syncthing_status = syncthing_session.get("/rest/system/status").json()
# Get the local device ID so we can evaluate whether we think we should have a folder
local_deviceid = normalize_device_id(syncthing_status["myID"])
return local_deviceid
def _syncthing_homedir(syncthing_session: requests.Session) -> Path:
logger.debug("Requesting current status")
syncthing_status = syncthing_session.get("/rest/system/status").json()
# Get the local device ID so we can evaluate whether we think we should have a folder
home_dir = Path(syncthing_status["tilde"])
return home_dir
def sync_devices(syncthing_session: requests.Session, hostctl_file: Path, dryrun=False):
"""Validate that all expected devices are connected"""
def log(msg):
if dryrun:
msg = f"(dryrun) {msg}"
logger.info(msg)
expected_hosts = get_known_devices(hostctl_file)
expected_name_to_id = { e.name:e.device_id for e in expected_hosts }
expected_id_to_name = { v:k for k,v in expected_name_to_id.items() }
logger.debug("Requesting configured devices")
syncthing_devices = syncthing_session.get("/rest/config/devices").json()
device_name_to_id = { device["name"]:device["deviceID"] for device in syncthing_devices }
device_id_to_name = { v:k for k,v in device_name_to_id.items() }
logger.debug("Finding missing devices (nothing will be removed if there's more then expected)")
missing_devices = {}
for name, device_id in expected_name_to_id.items():
if device_id not in device_id_to_name:
logger.info(f"Found missing device from Syncthing config: {name} {device_id}")
missing_devices[name] = device_id
if device_id in device_id_to_name:
if name != device_id_to_name[device_id]:
logger.warning(f"Device is added but does not have expected name: want {name} got {device_id_to_name[device_id]} for {device_id}")
if len(missing_devices) == 0:
logger.info("All expected devices are synced.")
return
logger.info("Adding configuration for missing devices")
errors = {}
for name, device_id in missing_devices.items():
device_request = syncthing_session.get("/rest/config/defaults/device").json()
device_request["deviceID"] = device_id
device_request["name"] = name
log(f"Adding configuration for {name} ({device_id})")
if not dryrun:
try:
r = syncthing_session.post("/rest/config/devices", json=device_request)
r.raise_for_status()
except Exception as e:
errors[(name,device_id)] = e
logger.error(f"Error while adding device {name} ({device_id})")
else:
log(f"Successfully added configuration for {name} ({device_id})")
if not dryrun:
if len(errors) == len(missing_devices):
raise SyncthingAPIError("Error: NO devices were successfully added")
elif len(errors) > 0:
raise SyncthingAPIError("Error: not all devices were successfully added")
def names_from_hostspecs(hostspecs: Iterable[HostSpec]) -> Tuple[str,...]:
"""Format hostspecs as names"""
return tuple( hostspec.name for hostspec in hostspecs )
def sync_shares(syncthing_session: requests.Session, hostctl_file: Path, folderctl_file: Path, dryrun=False, add_paths=False, remove_devices=False, change_directions=False):
"""
Sync the list of shares from folderctl into Syncthing.
Parameters
----------
syncthing_session
Requests session connected to syncthing
hostctl_file
Known hosts path
folderctl_file
Shared folder specification file
dryrun
Don't execute any changes - just print what would be done.
add_paths
Add missing paths to the cluster as newly shared folders provided there are some
other device IDs present on the cluster.
remove_devices
Remove extraneous devices from shared folder definitions. This will only remove
devices which are known to the cluster (i.e. in hostctl).
change_directions
Change sync directions on folders to match calculated directions
Returns
-------
"""
def log(msg):
if dryrun:
msg = f"(dryrun) {msg}"
logger.info(msg)
known_devices = get_known_devices(hostctl_file)
known_deviceids_to_devices = { hostspec.device_id:hostspec for hostspec in known_devices }
# Get the local device ID so we can evaluate whether we think we should have a folder
local_deviceid = _syncthing_localid(syncthing_session)
home_dir = _syncthing_homedir(syncthing_session)
logger.debug("Requesting currently configured devices")
cluster_devices = syncthing_session.get("/rest/config/devices").json()
remote_devices = { device["deviceID"]:device for device in cluster_devices if normalize_device_id(device["deviceID"]) != local_deviceid }
expanded = [ spec for spec in expand_folderctl(syncthing_session, hostctl_file, folderctl_file, local_deviceid=local_deviceid,) if local_deviceid in spec.allowed_device_ids ]
logger.debug("Requesting configured folders")
syncthing_folders = syncthing_session.get("/rest/config/folders").json()
logger.debug("Requesting pending folders")
cluster_folders = syncthing_session.get("/rest/cluster/pending/folders").json()
# Get a list of folders on the cluster. In an active cluster, all our configured folders are already
# here.
cluster_folderlabel_to_id = {}
for folder_id, offers in cluster_folders.items():
labels = { v["label"] for v in offers["offeredBy"].values() }
if len(labels) > 1:
logger.warning(f"Received multiple labels for an offered folder - ignoring: {repr(labels)} for {folder_id}")
continue
# Get the primary label
label = tuple(labels)[0]
cluster_folderlabel_to_id[label] = folder_id
# Turn the labels we found into paths by resolving them. We'll use this to identify paths we want.
cluster_folderpath_to_id = { resolve_label_to_path(home_dir, k):v for k,v in cluster_folderlabel_to_id.items() }
logger.debug("Finding missing folders (nothing will be removed if there's more)")
folder_id_to_path = { sf["id"]:Path(sf["path"]) for sf in syncthing_folders }
folder_id_to_folder = { sf["id"]:sf for sf in syncthing_folders }
folder_path_to_id = { v:k for k,v in folder_id_to_path.items() }
missing_paths: List[FolderSpec] = [] # folderspecs which aren't configured
incorrect_direction: Dict[str,FolderSpec] = {} # folderspecs with the wrong direction config
incomplete_devices: Dict[str,Iterable[HostSpec]] = {} # folder IDs with not enough devices
overcomplete_devices: Dict[str,Iterable[HostSpec]] = {} # folder IDs exceeding their known devices
for folderspec in expanded:
if folderspec.path not in folder_path_to_id:
# Check if this folder-spec should be shared to us.
if local_deviceid in folderspec.allowed_device_ids:
logger.info(f"Found missing folder from Syncthing config: {folderspec.name} {folderspec.path.as_posix()}")
missing_paths.append(folderspec)
else:
current_folderid = folder_path_to_id[folderspec.path]
current_folder = folder_id_to_folder[current_folderid]
# Check if this folder has the right direction on it
if current_folder["type"] != folderspec.sync_direction:
incorrect_direction[current_folderid] = folderspec
logger.info(f"Found folder with incorrect sync direction: {folderspec.name} is { current_folder['type']} but should be {folderspec.sync_direction}")
# Check if this folder has all the devices shared which it should
current_devices = { normalize_device_id(device["deviceID"]) for device in current_folder["devices"] }
missing_device_ids = folderspec.allowed_device_ids.difference(current_devices)
missing_hostspecs = [hostspec for hostspec in folderspec.allowed_hosts if hostspec.device_id in missing_device_ids]
if len(missing_device_ids) > 0:
logger.info(f"Found folder not being shared to all relevant devices: {folderspec.name} is missing hosts {','.join(names_from_hostspecs(missing_hostspecs))}")
incomplete_devices[current_folder["id"]] = missing_hostspecs
# Check if this folder has known devices it shouldn't have, provided they are already known to the configuration
extra_device_ids = { device_id for device_id in current_devices.difference(folderspec.allowed_device_ids) if device_id in known_deviceids_to_devices }
extra_hostspecs = [ known_deviceids_to_devices[device_id] for device_id in extra_device_ids ]
if len(extra_device_ids) > 0:
logger.info(f"Found folder being shared with too many devices: {folderspec.name} is has extra hosts {','.join(names_from_hostspecs(extra_hostspecs))}")
overcomplete_devices[current_folder["id"]] = extra_hostspecs
# logger.debug("Checking configured folders exist locally")
# for folderspec in expanded:
# if not folderspec.path.exists():
# log(f"Creating folder for configured path: {folderspec.name} {folderspec.path.as_posix()}")
# if not dryrun:
# folderspec.path.mkdir(parents=True, exist_ok=True)
# log(f"Created folder for configured path: {folderspec.name} {folderspec.path.as_posix()}")
if len(missing_paths) == 0 and len(incomplete_devices) == 0 and len(overcomplete_devices) == 0 and len(incorrect_direction) == 0:
logger.info("All configured folders are in sync")
return
existing_folderids = set(cluster_folders.keys()) | set(folder_id_to_path.keys())
errors = {}
if len(missing_paths) > 0:
logger.info("Add configuration for missing folders")
for folderspec in missing_paths:
if folderspec.path not in cluster_folderpath_to_id:
if not add_paths:
logger.warning(
f"No folder in the cluster available for this path - cannot add it: {folderspec.name} -> {folderspec.path}")
continue
if len(remote_devices) == 0:
logger.warning(
f"No folder in the cluster available, add_paths requested but no other devices are connected either - not proceeding: {folderspec.name} -> {folderspec.path}")
continue
folder_id = new_syncthing_folder_id(existing_folderids)
logger.info(f"No folder in the cluster available for this path - adding as new folder to cluster: {folderspec.name} -> {folderspec.path} @ {folder_id}")
else:
folder_id = cluster_folderpath_to_id[folderspec.path]
logger.info(f"Found an advertised folder for the missing folder: {folderspec.name} -> {folderspec.path} @ {folder_id}")
folder_template = syncthing_session.get("/rest/config/defaults/folder").json()
folder_template["id"] = folder_id
folder_template["label"] = folderspec.name
folder_template["path"] = folderspec.path.as_posix()
folder_template["type"] = folderspec.sync_direction
folder_template["devices"].extend([
{"deviceID": hostspec.device_id} for hostspec in folderspec.allowed_hosts if hostspec.device_id != local_deviceid ])
shared_hostnames = [ hostspec.name for hostspec in folderspec.allowed_hosts ]
log(f"Adding folder to syncthing: {folderspec.name} -> {folderspec.path} @ {folder_id} (shared with: {','.join(shared_hostnames)})")
if not dryrun:
try:
r = syncthing_session.post("/rest/config/folders", json=folder_template)
r.raise_for_status()
except Exception as e:
errors[(folderspec.name,folder_id)] = e
logger.error(f"Error while adding folder {folderspec.name} ({folder_id})")
else:
log(f"Successfully added folder to syncthing: {folderspec.name} -> {folderspec.path} @ {folder_id}")
update_errors = {}
if len(incomplete_devices) > 0:
logger.info("Updating device sharing for incomplete folders")
for folder_id, missing_hostspecs in incomplete_devices.items():
current_folder = syncthing_session.get(f"/rest/config/folders/{folder_id}").json()
current_label = current_folder["label"]
devices = current_folder["devices"]
for hostspec in missing_hostspecs:
devices.append({
"deviceID": hostspec.device_id,
})
log(f"Adding additional devices to folder share list: {current_label} ({folder_id}) added: {','.join(names_from_hostspecs(missing_hostspecs))}")
if not dryrun:
try:
r = syncthing_session.patch(f"/rest/config/folders/{folder_id}", json={
"devices": devices,
})
r.raise_for_status()
except Exception as e:
update_errors[folder_id] = e
logger.error(f"Error while updating the device share list for folder: {current_label} ({folder_id}) added: {','.join(names_from_hostspecs(missing_hostspecs))}")
else:
log(f"Successfully updated the device share list for folder: {current_label} ({folder_id}) added: {','.join(names_from_hostspecs(missing_hostspecs))}")
extra_errors = {}
if remove_devices and len(overcomplete_devices) > 0:
logger.info("Updating device sharing for overshared folders")
for folder_id, extra_hostspecs in overcomplete_devices.items():
current_folder = syncthing_session.get(f"/rest/config/folders/{folder_id}").json()
current_label = current_folder["label"]
devices = []
extra_deviceids = { hostspec.device_id for hostspec in extra_hostspecs }
for device in current_folder["devices"]:
if device["deviceID"] in extra_deviceids:
continue
devices.append(device)
log(f"Removing extraneous devices from folder share list: {current_label} ({folder_id}) removed: {','.join(names_from_hostspecs(extra_hostspecs))}")
if not dryrun:
try:
r = syncthing_session.patch(f"/rest/config/folders/{folder_id}", json={
"devices": devices,
})
r.raise_for_status()
except Exception as e:
extra_errors[folder_id] = e
logger.error(f"Error while updating the device share list for folder: {current_label} ({folder_id}) removed: {','.join(names_from_hostspecs(extra_hostspecs))}")
else:
log(f"Successfully updated the device share list for folder: {current_label} ({folder_id}) removed: {','.join(names_from_hostspecs(extra_hostspecs))}")
change_errors = {}
if change_directions and len(incorrect_direction) > 0:
logger.info("Updating folder sync directions for incorrect folders")
for folder_id, folder_spec in incorrect_direction.items():
current_folder = syncthing_session.get(f"/rest/config/folders/{folder_id}").json()
current_label = current_folder["label"]
current_direction = current_folder["type"]
log(f"Changing folder sync direction: {current_label} ({folder_id}) {current_direction} -> {folder_spec.sync_direction}")
if not dryrun:
try:
r = syncthing_session.patch(f"/rest/config/folders/{folder_id}", json={
"type": folder_spec.sync_direction,
})
r.raise_for_status()
except Exception as e:
change_errors[folder_id] = e
logger.error(
f"Error while updating the device sync direction for folder: {current_label} ({folder_id}) {current_direction} -> {folder_spec.sync_direction}")
else:
log(f"Successfully updated the sync direction for folder: {current_label} ({folder_id}) {current_direction} -> {folder_spec.sync_direction}")
if not dryrun:
if len(errors) != 0 or len(update_errors) != 0 or len(extra_errors) != 0 or len(change_errors) != 0:
raise SyncthingAPIError(f"Errors during folder update: {len(errors)} folder add errors, {len(update_errors)} folder device update errors, {len(extra_errors)} folder device removal errors, {len(change_errors)} folder direction change errors")
def format_table(wr: TextIO, rows):
"""Simple table formatting command"""
# Convert items to strings
parsed_rows = []
for row in rows:
if isinstance(row, str):
parsed_rows.append((row,))
elif isinstance(row, bytes):
parsed_rows.append((row.decode(),))
elif isinstance(row, Iterable):
parsed_rows.append(tuple( str(col) for col in row ))
else:
parsed_rows.append((str(row),))
col_maxwidth = {}
for row in parsed_rows:
for idx, item in enumerate(row):
col_maxwidth[idx] = len(item) if col_maxwidth.get(idx, 0) < len(item) else col_maxwidth.get(idx,0)
# Output the table
for row in parsed_rows:
output_items = [ item.ljust(col_maxwidth[idx], " ") for idx, item in enumerate(row) ]
wr.write(" ".join(output_items))
wr.write("\n")
@dataclass
class Config:
syncthing_session: requests.Session
config_root: Path = Path("~").expanduser() / "config" / "shared"
stdin: TextIO = sys.stdin
stdout: TextIO = sys.stdout
stderr: TextIO = sys.stderr
dryrun: bool = False
@property
def folderctl_file(self):
return self.config_root / "folderctl"
@property
def stglobalignore_file(self):
return self.config_root / ".stglobalignore"
@property
def hostctl_file(self):
return self.config_root / "hostctl"
def cli_hosts(config: Config, **kwargs):
format_table(config.stdout, [ host.to_line() for host in get_known_devices(config.hostctl_file) ] )
def cli_effective_folderctl(config: Config, **kwargs):
as_host: Optional[str] = kwargs.get("as_host", None)
if as_host is None:
local_deviceid = _syncthing_localid(config.syncthing_session)
else:
known_hosts: Dict[str,HostSpec] = { hostspec.name:hostspec for hostspec in get_known_devices(config.hostctl_file) }
try:
local_deviceid = known_hosts[as_host].device_id
except Exception as e:
raise KeyError(f"No host found for name: {as_host} (out of {','.join(known_hosts.keys())})")
result = expand_folderctl(config.syncthing_session, config.hostctl_file, config.folderctl_file, local_deviceid=local_deviceid)
if kwargs.get("resolve", False):
format_table(config.stdout, [spec.path for spec in result])
else:
format_table(config.stdout, [spec.name for spec in result])
def cli_write_ignore_files(config: Config, **kwargs):
write_ignore_files(config.syncthing_session, config.hostctl_file, config.folderctl_file, config.stglobalignore_file, dryrun=config.dryrun)
def cli_update_devices(config: Config, **kwargs):
"""Update folder shares in Syncthing"""
sync_devices(config.syncthing_session, config.hostctl_file, dryrun=config.dryrun)
def cli_update_shares(config: Config, **kwargs):
"""Update folder shares in Syncthing"""
sync_shares(config.syncthing_session, config.hostctl_file, config.folderctl_file,
dryrun=config.dryrun, add_paths=kwargs.get("add_paths", False),
remove_devices=kwargs.get("remove_devices", False),
change_directions=kwargs.get("change_directions", False))
def cli_log(config: Config, **kwargs):
"""Debug function for logging"""
getattr(logger, kwargs["level"])(kwargs["msg"])
def argparse_dir_path(argument: str) -> Path:
result = Path(argument)
if not result.is_dir():
raise NotADirectoryError(f"{argument} is not a directory")
return result
description = """
Manage Syncthing Shares and Ignore files.
"""
prog_name = Path(__file__).absolute().relative_to(Path(".").absolute(), walk_up=True)
parser = argparse.ArgumentParser(prog_name.as_posix(), description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--log-level", choices=[ l.lower() for l in logging._nameToLevel.keys()], default="info", help="Log Level")
parser.add_argument("--log-format", choices=[ l.lower() for l in _LOG_FORMATS.keys()], default="console", help="Log Format")
parser.add_argument("--log-dest", choices=[ l.lower() for l in _LOG_TARGETS.keys()], default="stderr", help="Log Target")
parser.add_argument("--home-dir", type=argparse_dir_path, default=(Path("~").expanduser()).as_posix(), help="Home directory to resolve paths against")
parser.add_argument("--config-root", type=argparse_dir_path, default=(Path("~").expanduser() / "config" / "shared").as_posix(), help="Root Configuration directory")
parser.add_argument("--syncthing-url", type=str, help="Syncthing Endpoint Path")
parser.add_argument("--syncthing-apikey", type=str, help="Syncthing API Key (autodiscovered if not specified)")
parser.add_argument("--dryrun", "--dry-run", action="store_true", default=False, help="Don't execute any changes")
parser.set_defaults(stdout=sys.stdout, stdin=sys.stdin, stderr=sys.stderr)
subparsers = parser.add_subparsers()
sparser = subparsers.add_parser("hosts")
sparser.set_defaults(fn=cli_hosts)
sparser = subparsers.add_parser("effective-folderctl")
sparser.add_argument("--resolve", action="store_true", help="Resolve folder paths")
sparser.add_argument("--as-host", type=str, help="Resolve folder paths as a particular host ID")
sparser.set_defaults(fn=cli_effective_folderctl)
sparser = subparsers.add_parser("update-devices")
sparser.set_defaults(fn=cli_update_devices)
sparser = subparsers.add_parser("update-folders")
sparser.add_argument("--add-paths", action="store_true", help="Add missing folders to the cluster from the local device")
sparser.add_argument("--remove-devices", action="store_true", help="Remove extra device share links from folders if they're known to us")
sparser.add_argument("--change-directions", action="store_true", help="Change direction settings on folders which are incorrect")
sparser.set_defaults(fn=cli_update_shares)
sparser = subparsers.add_parser("log")
sparser.add_argument("level", help="Log Level to log at")
sparser.add_argument("msg", help="Log message to emit")
sparser.set_defaults(fn=cli_log)
def main(argv):
args = parser.parse_args(argv)
configure_logging(args.log_level, args.log_format, args.log_dest)
syncthing_apikey = args.syncthing_apikey
syncthing_url = args.syncthing_url
# Discover from normal locations
config_xml_path = None
candidate_paths = [Path("~").expanduser() / ".config" / "syncthing" / "config.xml"]
if "APPDATA" in os.environ:
candidate_paths.append(Path(os.environ["APPDATA"]) / "syncthing" / "config.xml")
if "LOCALAPPDATA" in os.environ:
candidate_paths.append(Path(os.environ["LOCALAPPDATA"]) / "syncthing" / "config.xml")
for candidate_path in candidate_paths:
if candidate_path.exists():
config_xml_path = candidate_path
logger.debug(f"Found config.xml at {config_xml_path}")
break
if syncthing_apikey is None:
if config_xml_path is not None:
try:
root = etree.fromstring(config_xml_path.read_text())
except:
pass
try:
syncthing_apikey = root.xpath("/configuration/gui/apikey")[0].text
except:
pass
if syncthing_url is None:
if config_xml_path is not None:
try:
root = etree.fromstring(config_xml_path.read_text())
except:
pass
try:
syncthing_url = root.xpath("/configuration/gui/address")[0].text
syncthing_url = f"http://{syncthing_url}"
except:
pass
syncthing_session = _syncthing_session(syncthing_url, syncthing_apikey)
config = Config(
syncthing_session=syncthing_session,
config_root=args.config_root,
dryrun=args.dryrun,
stdin=args.stdin,
stdout=args.stdout,
stderr=args.stderr,
)
try:
return args.fn(config, **vars(args))
except Exception as e:
config.stderr.write(f"{e.__class__.__name__}: {e}\n")
return
if __name__ == "__main__":
try:
main(sys.argv[1:])
except Exception as e:
sys.stderr.write((f"{e.__class__.__name__}: {e}\n"))
sys.exit(1)
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment