Skip to content

Instantly share code, notes, and snippets.

@s3rgeym
Created May 20, 2024 21:31
Show Gist options
  • Save s3rgeym/74787da20875db21462be14aec8618ef to your computer and use it in GitHub Desktop.
Save s3rgeym/74787da20875db21462be14aec8618ef to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from __future__ import annotations
import argparse
import re
import sys
from functools import partial
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Sequence
import pymysql
import yaml
CSI = "\x1b["
RESET = f"{CSI}m"
RED = f"{CSI}31m"
GREEN = f"{CSI}32m"
YELLOW = f"{CSI}33m"
BLUE = f"{CSI}34m"
PURPLE = f"{CSI}35m"
print_err = partial(print, file=sys.stderr, flush=True)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--path", type=Path, default=Path("."))
parser.add_argument("-C", "--connect-timeout", type=float, default=10.0)
class NameSpace(argparse.Namespace):
path: Path
connect_timeout: float
MYSQL_IMAGE_RE = re.compile("mysql|mariadb")
DB_KEYS_RE = re.compile(
r"(DB|MYSQL|POSTGRES(QL)?|PG)\w*(HOST|USER|PASS|PWD|PORT|DATABASE|DB)"
)
def handle_compose(file_path: Path, args: NameSpace) -> None:
for name, data in get_compose_services(file_path).items():
check_compose_service(data, name, file_path.parent.name, args)
def check_compose_service(
data: dict[str, Any], service_name: str, default_hostname: str, args: NameSpace
) -> None:
print_err(f"{BLUE}[D] check service: {service_name}{RESET}")
env_keys = data.get("environment", {})
if isinstance(env_keys, list):
env_keys = dict(x.split("=", 1) for x in env_keys)
db_keys = {k: env_value(v) for k, v in env_keys.items() if DB_KEYS_RE.search(k)}
if not db_keys:
return
if is_mysql(data, db_keys):
db_config = DBConfig.from_env_dict(db_keys)
db_config.host = db_config.host or default_hostname
db_config.port = db_config.port or find_host_port(3306, data)
try:
pymysql.connect(
host=db_config.host,
user=db_config.username,
password=db_config.password,
database=db_config.database,
connect_timeout=args.connect_timeout,
)
print_err(f"{GREEN}[OK] connection succeeded: {db_config=}{RESET}")
except Exception as error:
print_err(f"{RED}[FAIL] connection failed: {db_config=}, {error=}{RESET}")
class SubMatch(str):
__eq__ = str.__contains__
class DBConfig(SimpleNamespace):
database: str = None
host: str = None
password: str = None
port: int = None
username: str = "root"
@classmethod
def from_env_dict(cls, env_dict: dict[str, str]) -> DBConfig:
print_err(f"{BLUE}[D] {env_dict=}{RESET}")
c = cls()
for k, v in env_dict.items():
match SubMatch(k):
case "HOST":
c.host = v
case "PORT":
c.port = int(v)
case "USER":
c.username = v
case "PASS" | "PWD":
c.password = v
case "DB" | "DATABASE":
c.database = v
case _:
print_err(f"{RED}[!] unknown key: {k}{RESET}")
return c
def find_host_port(container_port: int, data: dict) -> int | None:
if v := next(
(x for x in data.get("ports", []) if x.endswith(f":{container_port}")), None
):
try:
# ${FORWARDED_MYSQL_PORT:-3306}:3306
return int(env_value(v.rsplit(":", 1)[0]))
except ValueError as e:
print_err(f"{RED}[!] {e}{RESET}")
return None
def is_mysql(data: dict, db_keys: dict) -> bool:
if v := data.get("image"):
if MYSQL_IMAGE_RE.search(v):
return True
if any(x.endswith(":3306") for x in data.get("ports", [])):
return True
if any("MYSQL" in key for key in db_keys):
return True
return False
def env_value(value: str) -> str:
"""
>>> env_value('${DB_NAME:-wordpress}')
wordpress
"""
if isinstance(value, str) and value[:2] == "${" and value[-1] == "}":
return value[2:-1].split(":-", 1)[1]
return value
def get_compose_services(file_path) -> dict:
with file_path.open() as stream:
try:
data = yaml.safe_load(stream)
return data["services"]
except Exception as e:
print_err(f"{RED}[!] {e}{RESET}")
return {}
def handle_env(file_path: Path, args: NameSpace) -> None: ...
HANDLERS_MAP = {
"**/*compose*.yml": handle_compose,
"**/*.env": handle_env,
}
def main(argv: Sequence[str] = sys.argv[1:]) -> None:
args: NameSpace = parser.parse_args(argv, namespace=NameSpace())
for pat, hdlr in HANDLERS_MAP.items():
for path in args.path.glob(pat):
print_err(f"{BLUE}[D] handle {path!s}{RESET}")
hdlr(path, args)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment