Skip to content

Instantly share code, notes, and snippets.

@XVilka
Created April 9, 2020 10:17
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save XVilka/f124936a336a5a43f54915369719e626 to your computer and use it in GitHub Desktop.
Save XVilka/f124936a336a5a43f54915369719e626 to your computer and use it in GitHub Desktop.
Dotfiles Syncronization Script
#!/usr/bin/env python3
# pylint: disable=missing-docstring
# pylint: disable=invalid-name
# pylint: disable=wrong-import-position
# pylint: disable=import-outside-toplevel
# pylint: disable=logging-format-interpolation
"""Dotfiles Syncronization Script
This script allows to syncronize your dotfiles among different computers with
various environment and used programs. It can work locally and remotely through
SSH tunnel.
It has two main modes - local and remote. The files to be syncronized are described
in the JSON file following this structure:
.. code-block:: json
{
"configs": {
"single entry": {
"config": "repository/local/path",
"location": "target/location",
},
"category": {
"single entry": {
"config": "repository/local/path",
"location": "target/location",
"ssh": false,
"msys": true
},
...
},
...
}
The structure of the config allows to have a different dotfiles structure in the
local storage from the real config files location. By default it reads "./configfiles.json"
but the location of it can be specified with ``-l`` option.
The usual workflow is to have the "~/dotfiles" directory which is a git repository
with your files that can be syncronized through GitHub or any other service. And then
running ``sync.py`` to update your real configuration files.
The default base path for copying files is ``$HOME``, but a different one can be specified
with ``-p`` option. The configuration entry can contain the platform arguments,
like ``"ssh": false`` and ``"msys": true``. This example means that the file will not
be copied during the remote mode, and will be installed for the MSYS2 configuration
on Windows platform. The supported platforms are: "msys", "docker", "ssh".
It can copy the files to the location (``-c`` option) or just create symlinks instead (the default
mode). If the target file is already in place it's possible to force the replace (or fix the
symbolic link) with the ``-r`` option.
Remote syncronization allows you upload the files to the remote machine through either direct
SSH connection, or using a SSH tunnel. It reads the current SSH config file (e.g. "~/.ssh/config")
and loads the settings from there. For example, if you have these lines in your SSH config:
.. code-block:: txt
Host myhost
HostName cute.domain.address.io
Port 9999
User myuser
IdentityFile ~/.ssh/myhost_rsa
it will allow you to specify just the name of the host with command ``sync.py -m remotessh myhost``.
In case you want to use the SSH tunnel, the corresponding mode is ``-m sshtunnel``. It also reads
the settings from your SSH config:
.. code-block:: txt
Host server1
HostName cute.address.io
IdentityFile ~/.ssh/myhost_rsa
Host server2_behind_server1
HostName ugly.address.io
IdentityFile ~/.ssh/myhost_rsa
ProxyCommand ssh server1 -W %h:%p
The script requires only two external Python packages:
* paramiko
* sshtunnel
Both are not a requirement if only the local mode is being used.
"""
import os
import os.path as path
import sys
import argparse
import shutil
import json
import configparser
import getpass
import logging
if sys.version_info.major < 3 or sys.version_info.minor < 7:
raise Exception("Only Python >= 3.7 supported!")
from enum import Enum
from dataclasses import dataclass
logging.basicConfig(format="%(message)s", level=logging.DEBUG)
# --------------------------------------------------------------------
# 1. Check if all needed executables and libraries are installed
missing_modules_message = """
paramiko and sshtunnel are not found!
Please install it using the following command:
pip3 install -U --user paramiko sshtunnel
"""
def check_module(package_name):
import importlib.util
spec = importlib.util.find_spec(package_name)
if spec is None:
return False
return True
# -----------------------------------------------------------------------------
# Various helpers
def ignore_platform(item, current, target):
if target in item:
if (not item[target]) and (current == target):
return True
return False
return False
# Check the symlink and recreate if broken
def recreate_symlink(dest, source):
target_path = os.readlink(dest)
if not path.isabs(target_path):
target_path = path.join(path.dirname(dest), target_path)
if not target_path == source:
# remove file
logging.info("Removing broken link {0} -> {1}".format(dest, target_path))
os.remove(dest)
logging.info("Creating symlink {0} -> {1}".format(dest, source))
os.symlink(source, dest)
else:
logging.warning("Symlink is already valid {0} -> {1}".format(dest, target_path))
# Replace file or symlink if exists already
def replace_files(dest, source):
if path.islink(dest):
recreate_symlink(dest, source)
elif path.isfile(dest):
# remove file
logging.warning("Removing regular file {0}".format(dest))
os.remove(dest)
logging.info("Creating symlink {0} -> {1}".format(dest, source))
os.symlink(source, dest)
else:
destdir = path.dirname(dest)
if path.exists(destdir):
logging.info("Creating symlink {0} -> {1}".format(dest, source))
os.symlink(source, dest)
else:
logging.error("Directory {0} doesn't exist".format(destdir))
# Plain symlink without replace
def symlink_files(dest, source):
if (not path.isfile(dest)) and (not path.islink(dest)):
dest_dir = path.dirname(dest)
if path.exists(dest_dir):
logging.info("Creating symlink {0} -> {1}".format(dest, source))
os.symlink(source, dest)
else:
logging.error("Directory {0} doesn't exist".format(dest_dir))
else:
logging.warning("File {0} exists".format(dest))
# -----------------------------------------------------------------------------
class TransferMode(Enum):
localcopy = 1
remotessh = 2
sshtunnel = 3
def __str__(self):
return self.name
def __repr__(self):
return str(self)
@staticmethod
def argparse(s):
try:
return TransferMode[s]
except KeyError:
raise s
# -----------------------------------------------------------------------------
# A wrapper for paramiko and sshtunnel to not require these modules
# if the remotessh/tunnel modes are not activated
class SSHFunctions():
def __init__(self):
# Do not check if it's just a local copy
if (not check_module("paramiko")) and (not check_module("sshtunnel")):
raise Exception(missing_modules_message)
import paramiko
# TODO: Switch to the vanilla paramiko?
import sshtunnel
# A workaround for static analyzers
self.paramiko = paramiko
self.sshtunnel = sshtunnel
def config_parser(self, target, configpath):
# Initialize the configuration
cfg = {'hostname': target.machine, 'port': None, 'username': None, 'password': None}
# Parse the system SSH configuration file
ssh_config = self.paramiko.SSHConfig()
# TODO: Allow different location?
user_config_file = os.path.expanduser("~/.ssh/config")
if os.path.exists(user_config_file):
with open(user_config_file) as f:
ssh_config.parse(f)
# Search for the specified machine name (hostname) in the SSH config
user_config = ssh_config.lookup(cfg['hostname'])
# If the entry exists - use the SSH configuration
if user_config:
for k in ('hostname', 'username', 'port'):
if k in user_config:
cfg[k] = user_config[k]
# Load also the key file if present
if 'identityfile' in user_config:
cfg['key_filename'] = user_config['identityfile'][0]
cfg['look_for_keys'] = False
# TODO: Add support for `ProxyJump` directive
if 'proxycommand' in user_config:
cfg['sock'] = self.paramiko.ProxyCommand(user_config['proxycommand'])
# If the username not in the SSH config - use the current one
if 'username' not in user_config:
cfg['username'] = getpass.getuser()
# TODO: Maybe ask the user/password interactively instead?
logging.debug("Loaded SSH connection configuration from {0}".format(user_config_file))
# If it doesn't - try to load the custom configuration file
else:
# TODO: Allow the keys auth in the configuration options
# Read the settings from configuration file
conf = configparser.ConfigParser()
conf.read([os.path.expanduser(configpath)])
sect = "SSH"
if sect in conf.sections():
cfg['hostname'] = conf.get(sect, "REMOTEHOST")
cfg['port'] = conf.get(sect, "REMOTEPORT")
cfg['username'] = conf.get(sect, "USERNAME")
# TODO: Allow to specify the key!
cfg['password'] = conf.get(sect, "PASSWORD")
logging.debug("Loaded SSH connection configuration from {0}".format(configpath))
else:
return None
return cfg
# Separate connection for copying the files over
def createSSHClient(self, cfg):
self.paramiko.util.log_to_file("sync_ssh_upload.log")
client = self.paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(self.paramiko.AutoAddPolicy())
logging.debug("Connecting to {0}@{1}:{2}..."
.format(cfg['username'], cfg['hostname'], cfg['port']))
client.connect(**cfg)
return client
# Forwarder for anything other rather than just copying
def createSSHForwarder(self, sshtunconf):
server = self.sshtunnel.SSHTunnelForwarder(
(sshtunconf.host, int(sshtunconf.port)),
ssh_username=sshtunconf.username,
ssh_password=sshtunconf.password,
remote_bind_address=('127.0.0.1', int(sshtunconf.remote_port)))
return server
# -----------------------------------------------------------------------------
class SSHConfig():
def __init__(self, functions, target, mode=TransferMode.localcopy, configpath="sync.conf"):
self.ssh = None
# TODO: Parse the URLs like "user:password@machine.name:port"
if target and target.machine:
self.ssh = functions.config_parser(target, configpath)
if (mode is not TransferMode.localcopy) and (self.ssh is None):
raise Exception("Error - can't find suitable configuration for remote installation!")
else:
if mode is not TransferMode.localcopy:
raise ValueError("Error - target can't be empty!")
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# For accessing everything through SSH tunnel
# TODO: Move inside the SSHFunctions maybe?
# TODO: Load ssh tunnels also from the .ssh/config file (like in SSHConfig class)
class DotfilesSSHTunnel():
def __init__(self, config, functions):
self.config = config
self.functions = functions
self.tunnels = []
def find_forward(self, remote_port):
result = None
for t in self.tunnels:
if t['remote_port'] == remote_port:
result = t
return result
# Returns local bind port number
def add_forward(self, remote_port):
def create_tunnel():
sshtunnel = self.functions.sshtunnel
tunnel = sshtunnel.SSHTunnelForwarder(
(self.config.host, int(self.config.port)),
ssh_username=self.config.username,
ssh_password=self.config.password,
remote_bind_address=('localhost', int(remote_port)),
logger=sshtunnel.create_logger(loglevel="ERROR")
)
if tunnel:
tunnel.start()
local_port = str(tunnel.local_bind_port)
forwarder = {
'remote_port': remote_port,
'local_port': local_port,
'tunnel': tunnel,
}
self.tunnels.append(forwarder)
logging.debug("Forward for {0} port successful!".format(remote_port))
logging.debug("Added:\n{0}".format(forwarder))
return
create_tunnel()
return self.find_forward(remote_port)
def del_forward(self, remote_port):
t = self.find_forward(remote_port)
if t:
tunnel = t['tunnel']
tunnel.stop()
self.tunnels.remove(t)
# -----------------------------------------------------------------------------
class _SSH():
def __init__(self, sshfunctions):
self.sftpsession = None
self.tunnel = None
self.functions = sshfunctions
# High-level access for files transfer
class DotfilesTransferSSH():
def __init__(self, mode, config, sshfunctions):
self.mode = mode
self.config = config
# can contain ssh.tunnel, ssh.sftpsession, etc
self.ssh = _SSH(sshfunctions)
def ssh_start(self):
# Start SSH server
if self.config and self.config.ssh:
logging.info("Connecting to the remote machine...")
ssh = self.ssh.functions.createSSHClient(self.config.ssh)
sftpclient = self.ssh.functions.paramiko.sftp_client.SFTPClient
self.ssh.sftpsession = sftpclient.from_transport(ssh.get_transport())
else:
logging.error("Error initializing SSH tranfer - missing configuration!")
def start(self):
if self.mode == TransferMode.sshtunnel:
# Initialize tunnel first
logging.info("Initializing SSH tunnel...")
elif self.mode == TransferMode.remotessh:
# Simple SFTP copy
self.ssh_start()
else:
raise Exception("Wrong mode!")
# ------------------------------------------------------------------------------
def ssh_close(self):
if self.ssh and self.ssh.sftpsession:
self.ssh.sftpsession.close()
if self.ssh and self.ssh.tunnel:
self.ssh.tunnel.stop()
def close(self):
if self.mode == TransferMode.remotessh or self.mode == TransferMode.sshtunnel:
logging.info("Closing all remote SSH connections...")
self.ssh_close()
else:
raise Exception("Wrong mode!")
# ------------------------------------------------------------------------------
# Copy files over SSH
def ssh_copy(self, dest, source):
try:
self.ssh.sftpsession.put(source, dest, confirm=True)
logging.info("SFTP: {0} successfully uploaded".format(source))
return True
except Exception as e:
# TODO: Check if file exists (check size)
logging.error("SFTP: {0} error during {1} uploading".format(e, source))
return False
def copy(self, dest, source):
if self.mode == TransferMode.sshtunnel or self.mode == TransferMode.remotessh:
# SFTP copy
logging.info("SFTP: copying {0} to {1}...".format(source, dest))
self.ssh_copy(dest, source)
else:
raise Exception("Wrong mode!")
# High-level access for files transfer
class DotfilesTransferLocal():
def __init__(self, mode):
self.mode = mode
def start(self):
logging.info("Starting to copy the local files...")
# ------------------------------------------------------------------------------
def close(self):
pass
# Copy files and create directories if required
def copy(self, dest, source):
logging.info('Copying {0} to {1}'.format(source, dest))
os.makedirs(os.path.dirname(dest), exist_ok=True)
shutil.copyfile(source, dest)
# -----------------------------------------------------------------------------
def is_category(entry):
if "config" in entry and "location" in entry:
return False
return True
# TODO: Support the "after" to run the command after the copy was done
class Dotfiles():
def __init__(self, target, mode=TransferMode.localcopy):
self.target = target
if mode is not TransferMode.localcopy:
# Load SSH dependencies and helpers
sshfunctions = SSHFunctions()
# Load the SSH configuration and keys
sshconfig = SSHConfig(sshfunctions, target, mode)
if not sshconfig:
raise Exception("Error initializing the transfer - missing configuration!")
self.transfer = DotfilesTransferSSH(mode, sshconfig, sshfunctions)
else:
self.transfer = DotfilesTransferLocal(mode)
self.transfer.start()
# Create backups
def local_backup(self, source, dest):
logging.info('Copying {0} to {1}'.format(source, dest))
err_msg = 'Config is not at path: {0}'.format(path.abspath(source))
assert path.isfile(source), err_msg
shutil.copyfile(source, dest)
# TODO: Add SSH backups
def ssh_backup(self, source, dest):
pass
def backup(self, source, dest):
if self.transfer and self.transfer.mode is TransferMode.localcopy:
self.local_backup(source, dest)
else:
self.ssh_backup(source, dest)
def local_copy_file(self, options, entry):
if ignore_platform(entry, "msys", self.target.platform):
return
if ignore_platform(entry, "docker", self.target.platform):
return
loc_path = os.path.join(target.path, entry['location'])
conf_full_path = path.abspath(path.expanduser(entry['config']))
loc_full_path = path.abspath(path.expanduser(loc_path))
if options.backup:
self.backup(loc_full_path, conf_full_path)
else:
err_msg = 'Config is not at path: {0}'.format(path.abspath(conf_full_path))
assert path.isfile(conf_full_path), err_msg
if options.copy:
self.transfer.copy(loc_full_path, conf_full_path)
else:
# Force rewriting the files, replace them
if options.replace:
replace_files(loc_full_path, conf_full_path)
# Do not replace existing files
else:
symlink_files(loc_full_path, conf_full_path)
def ssh_copy_file(self, options, entry):
# Process only whitelisted files!
if ignore_platform(entry, "ssh", "ssh"):
return
loc_path = entry['location']
conf_full_path = path.abspath(path.expanduser(entry['config']))
if options.backup:
self.backup(loc_path, conf_full_path)
self.transfer.copy(loc_path, conf_full_path)
# -----------------------------------------------------------------------------
# TODO: handle also the diffs between existing and repository files
# TODO: Handle exceptions in case of missing files, directories, etc
def local_copy(self, options, filelist):
script_location = path.abspath(__file__)
config_directory = path.dirname(script_location)
os.chdir(config_directory)
for k, v in filelist.items():
# Filelist entries can be pure entry or a category
if is_category(v):
logging.info("Processing category \"{0}\"".format(k))
for _, y in v.items():
self.local_copy_file(options, y)
else:
self.local_copy_file(options, v)
# -----------------------------------------------------------------------------
# TODO: handle also the diffs between existing and repository files
# TODO: Handle exceptions in case of missing files, directories, etc
def ssh_copy(self, options, filelist):
script_location = path.abspath(__file__)
config_directory = path.dirname(script_location)
os.chdir(config_directory)
for k, v in filelist.items():
# Filelist entries can be pure entry or a category
if is_category(v):
logging.info("Processing category \"{0}\"".format(k))
for _, y in v.items():
self.ssh_copy_file(options, y)
else:
self.ssh_copy_file(options, v)
def copy(self, options, filelist):
if self.transfer and self.transfer.mode is TransferMode.localcopy:
logging.info("Installing the dotfiles locally...")
self.local_copy(options, filelist)
else:
logging.info("Installing the dotfiles remotely...")
self.ssh_copy(options, filelist)
def close(self):
if self.transfer:
self.transfer.close()
# -----------------------------------------------------------------------------
@dataclass
class Target:
platform: str = sys.platform
path: str = "~"
machine: str = None
files: str = "configfiles.json"
@dataclass
class Options:
backup: bool = False
copy: bool = False
replace: bool = False
# -----------------------------------------------------------------------------
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--backup", action='store_true',
help="Backup the files from your $HOME")
parser.add_argument("-m", "--mode", type=TransferMode.argparse,
choices=list(TransferMode), default=TransferMode.localcopy,
help="Specify the transfer mode")
parser.add_argument("-c", "--copy", action='store_true',
help="Copy the files instead of creating symlinks")
parser.add_argument("-r", "--replace", action='store_true',
help="Force rewriting the files, replace them")
parser.add_argument("-l", "--list", action="store",
help="Specify the config files list")
parser.add_argument("-p", "--path", action="store",
help="Specify target path if different from $HOME")
parser.add_argument("-t", "--target", action="store",
help="Specify the target platform if different from current")
# And one more argument - the hostname, if the remote modes are chosen
parser.add_argument("machine", nargs="?",
help="Specify the machine name in case of remote installation")
args = parser.parse_args()
# Some defaults
target = Target()
# Non-default target options
if args.list is not None:
target.files = args.list
if args.target is not None:
target.platform = args.target
if args.path is not None:
target.path = args.path
if args.machine is not None:
target.machine = args.machine
# Set the operation options
options = Options()
options.backup = args.backup
options.copy = args.copy # Makes sense only for the local installations!
options.replace = args.replace
# Initialize the configuration, and a transfer, e.g. for SSH tunnel
dotfiles = Dotfiles(target, mode=args.mode)
# Load categories from JSON file
# TODO: Support also YAML
with open(target.files, "r") as json_file:
configs = json.load(json_file)
filelist = configs["configs"]
logging.info("Processing the {0} dotfiles list...".format(target.files))
dotfiles.copy(options, filelist)
# Close all connections gracefully if there were any
dotfiles.close()
@OldskoolOrion
Copy link

That;s my kinda script :-) neatly written... enjoyed browsing thru it - you definitely put some thought in it from the start, but kept it really readable 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment