Created
May 20, 2024 21:31
-
-
Save s3rgeym/74787da20875db21462be14aec8618ef to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import annotations | |
import argparse | |
import re | |
import sys | |
from functools import partial | |
from pathlib import Path | |
from types import SimpleNamespace | |
from typing import Any, Sequence | |
import pymysql | |
import yaml | |
CSI = "\x1b[" | |
RESET = f"{CSI}m" | |
RED = f"{CSI}31m" | |
GREEN = f"{CSI}32m" | |
YELLOW = f"{CSI}33m" | |
BLUE = f"{CSI}34m" | |
PURPLE = f"{CSI}35m" | |
print_err = partial(print, file=sys.stderr, flush=True) | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-p", "--path", type=Path, default=Path(".")) | |
parser.add_argument("-C", "--connect-timeout", type=float, default=10.0) | |
class NameSpace(argparse.Namespace): | |
path: Path | |
connect_timeout: float | |
MYSQL_IMAGE_RE = re.compile("mysql|mariadb") | |
DB_KEYS_RE = re.compile( | |
r"(DB|MYSQL|POSTGRES(QL)?|PG)\w*(HOST|USER|PASS|PWD|PORT|DATABASE|DB)" | |
) | |
def handle_compose(file_path: Path, args: NameSpace) -> None: | |
for name, data in get_compose_services(file_path).items(): | |
check_compose_service(data, name, file_path.parent.name, args) | |
def check_compose_service( | |
data: dict[str, Any], service_name: str, default_hostname: str, args: NameSpace | |
) -> None: | |
print_err(f"{BLUE}[D] check service: {service_name}{RESET}") | |
env_keys = data.get("environment", {}) | |
if isinstance(env_keys, list): | |
env_keys = dict(x.split("=", 1) for x in env_keys) | |
db_keys = {k: env_value(v) for k, v in env_keys.items() if DB_KEYS_RE.search(k)} | |
if not db_keys: | |
return | |
if is_mysql(data, db_keys): | |
db_config = DBConfig.from_env_dict(db_keys) | |
db_config.host = db_config.host or default_hostname | |
db_config.port = db_config.port or find_host_port(3306, data) | |
try: | |
pymysql.connect( | |
host=db_config.host, | |
user=db_config.username, | |
password=db_config.password, | |
database=db_config.database, | |
connect_timeout=args.connect_timeout, | |
) | |
print_err(f"{GREEN}[OK] connection succeeded: {db_config=}{RESET}") | |
except Exception as error: | |
print_err(f"{RED}[FAIL] connection failed: {db_config=}, {error=}{RESET}") | |
class SubMatch(str): | |
__eq__ = str.__contains__ | |
class DBConfig(SimpleNamespace): | |
database: str = None | |
host: str = None | |
password: str = None | |
port: int = None | |
username: str = "root" | |
@classmethod | |
def from_env_dict(cls, env_dict: dict[str, str]) -> DBConfig: | |
print_err(f"{BLUE}[D] {env_dict=}{RESET}") | |
c = cls() | |
for k, v in env_dict.items(): | |
match SubMatch(k): | |
case "HOST": | |
c.host = v | |
case "PORT": | |
c.port = int(v) | |
case "USER": | |
c.username = v | |
case "PASS" | "PWD": | |
c.password = v | |
case "DB" | "DATABASE": | |
c.database = v | |
case _: | |
print_err(f"{RED}[!] unknown key: {k}{RESET}") | |
return c | |
def find_host_port(container_port: int, data: dict) -> int | None: | |
if v := next( | |
(x for x in data.get("ports", []) if x.endswith(f":{container_port}")), None | |
): | |
try: | |
# ${FORWARDED_MYSQL_PORT:-3306}:3306 | |
return int(env_value(v.rsplit(":", 1)[0])) | |
except ValueError as e: | |
print_err(f"{RED}[!] {e}{RESET}") | |
return None | |
def is_mysql(data: dict, db_keys: dict) -> bool: | |
if v := data.get("image"): | |
if MYSQL_IMAGE_RE.search(v): | |
return True | |
if any(x.endswith(":3306") for x in data.get("ports", [])): | |
return True | |
if any("MYSQL" in key for key in db_keys): | |
return True | |
return False | |
def env_value(value: str) -> str: | |
""" | |
>>> env_value('${DB_NAME:-wordpress}') | |
wordpress | |
""" | |
if isinstance(value, str) and value[:2] == "${" and value[-1] == "}": | |
return value[2:-1].split(":-", 1)[1] | |
return value | |
def get_compose_services(file_path) -> dict: | |
with file_path.open() as stream: | |
try: | |
data = yaml.safe_load(stream) | |
return data["services"] | |
except Exception as e: | |
print_err(f"{RED}[!] {e}{RESET}") | |
return {} | |
def handle_env(file_path: Path, args: NameSpace) -> None: ... | |
HANDLERS_MAP = { | |
"**/*compose*.yml": handle_compose, | |
"**/*.env": handle_env, | |
} | |
def main(argv: Sequence[str] = sys.argv[1:]) -> None: | |
args: NameSpace = parser.parse_args(argv, namespace=NameSpace()) | |
for pat, hdlr in HANDLERS_MAP.items(): | |
for path in args.path.glob(pat): | |
print_err(f"{BLUE}[D] handle {path!s}{RESET}") | |
hdlr(path, args) | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment