Skip to content

Instantly share code, notes, and snippets.

@thautwarm
Created February 10, 2023 05:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thautwarm/2ddb524401b744cdea2cdbdeb6ed817b to your computer and use it in GitHub Desktop.
Save thautwarm/2ddb524401b744cdea2cdbdeb6ed817b to your computer and use it in GitHub Desktop.
switching git users with configuration files
#!/usr/bin/env python
from __future__ import annotations
from pathlib import Path
from colorama import Fore, Style
from shutil import make_archive, unpack_archive
from contextlib import contextmanager
import io
import hashlib
import sys
import json
import wisepy2
import typing
import base64
import yaml
import struct
class Resource(typing.TypedDict):
is_dir: bool
data: str
class UserInfo(typing.TypedDict):
user: str
data: dict[str, Resource]
class Config(typing.TypedDict):
current_user: str
users: dict[str, UserInfo]
ssh_data_dir = Path("~").expanduser().joinpath(".dmgr").absolute()
ssh_data_dir.mkdir(parents=True, exist_ok=True, mode=0o700)
home = Path("~").expanduser().absolute()
def zip_directory_to_buf(directory: str):
"""Create a zip file in memory from a directory."""
tmp_dir = ssh_data_dir.joinpath("tmp")
tmp_dir.mkdir(parents=True, exist_ok=True, mode=0o700)
make_archive(tmp_dir.joinpath("tmp"), "zip", directory)
with open(tmp_dir.joinpath("tmp.zip"), "rb") as f:
data = f.read()
tmp_dir.joinpath("tmp.zip").unlink()
return data
def zip_buf_to_directory(data: bytes, target_directory: str):
"""Unpack a zip file from memory to a directory."""
tmp_dir = ssh_data_dir.joinpath("tmp")
tmp_dir.mkdir(parents=True, exist_ok=True, mode=0o700)
with open(tmp_dir.joinpath("tmp.zip"), "wb") as f:
f.write(data)
unpack_archive(tmp_dir.joinpath("tmp.zip"), target_directory)
tmp_dir.joinpath("tmp.zip").unlink()
@contextmanager
def with_config():
_datafile = ssh_data_dir.joinpath("config.json")
config: Config
if not _datafile.exists():
_datafile.write_text("{}", encoding='utf-8')
config = {}
else:
config = json.loads(_datafile.read_text(encoding='utf-8'))
try:
yield config
finally:
_datafile.write_text(
json.dumps(config, indent=4, ensure_ascii=False),
encoding='utf-8'
)
def visit_field_(c: dict, field: str, default):
if field in c:
return c[field]
c[field] = default
return default
def to_b64_str(s: bytes):
return base64.b64encode(s).decode('utf-8')
def from_b64_str(s: str):
return base64.b64decode(s.encode('utf-8'))
def create_resource(path: Path):
if path.is_dir():
data = zip_directory_to_buf(path)
return Resource(is_dir=True, data=to_b64_str(data))
data = path.read_bytes()
return Resource(is_dir=False, data=to_b64_str(data))
def from_resource(resource: Resource, path: str):
data = from_b64_str(resource["data"])
if resource["is_dir"]:
zip_buf_to_directory(data, path)
else:
Path(path).write_bytes(data)
class Main:
@staticmethod
def add(path: str):
file = Path(path).absolute()
if not file.exists():
print(Fore.YELLOW + f"File {path!r} does not exist." + Style.RESET_ALL)
sys.exit(1)
if not file.is_relative_to(home):
print(Fore.RED + f"File {path!r} is not in your home directory." + Style.RESET_ALL)
sys.exit(1)
with with_config() as config:
# package 'file' into a zip file in memory
user = visit_field_(config, "current_user", home.name)
user_info = visit_field_(visit_field_(config, "users", {}), user, {})
user_info['user'] = user
data = visit_field_(user_info, "data", {})
data[file.relative_to(home).as_posix()] = create_resource(file)
print(Fore.GREEN + f"Added file {path!r} to {user}'s config." + Style.RESET_ALL)
@staticmethod
def sync(user: str = ""):
with with_config() as config:
user = user or visit_field_(config, "current_user", home.name)
users = visit_field_(config, "users", {})
user_info = visit_field_(users, user, {})
user_info = typing.cast('UserInfo', user_info)
user_info['user'] = user
data = visit_field_(
user_info, "data",
typing.cast('dict[str, Resource]', {})
)
for path in list(data.keys()):
data[path] = create_resource(home.joinpath(path))
print(Fore.GREEN + f"Synced data for user {user!r}." + Style.RESET_ALL)
@staticmethod
def create(user: str):
Main.sync()
with with_config() as config:
users = visit_field_(config, "users", {})
if user in users:
print(Fore.YELLOW + f"User {user!r} already exists." + Style.RESET_ALL)
else:
_ = visit_field_(users, user, UserInfo(user=user, data={}))
print(Fore.GREEN + f"Created user {user!r}." + Style.RESET_ALL)
@staticmethod
def switch(user: str):
Main.sync()
with with_config() as config:
user = user
users = visit_field_(config, "users", {})
if user not in users:
print(Fore.RED + f"User {user!r} does not exist." + Style.RESET_ALL)
sys.exit(1)
user_info = visit_field_(users, user, {})
user_info['user'] = user
data = visit_field_(user_info, "data", {})
for path, resource in data.items():
from_resource(resource, home.joinpath(path).as_posix())
config["current_user"] = user
print(Fore.GREEN + f"Switched to user {user!r}." + Style.RESET_ALL)
@staticmethod
def show(sync: bool = False):
if sync:
Main.sync()
with with_config() as config:
buf = io.StringIO()
yaml.safe_dump(_show_config(config), buf)
print(buf.getvalue())
def ident(x: str):
maxi = 2 ** 32
z = 1283
for c in from_b64_str(x):
z = (z * (1 + c) + 7) % maxi
return z
def _show_resource(r: Resource):
if r["is_dir"]:
return "/"
else:
return ''
def _show_user_info(user_info: UserInfo):
files = {}
for k, v in user_info.get("data", {}).items():
files[k + _show_resource(v)] = ident(v["data"])
return {'user': user_info['user'], 'items': files}
def _show_config(config: Config):
current_user = config.get("current_user", home.name)
users = config.get("users", {})
results = {}
for k, v in users.items():
results[k] = _show_user_info(v)
return {'current_user': current_user, 'users': results}
wisepy2.wise(Main)()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment