Skip to content

Instantly share code, notes, and snippets.

@Radiergummi
Last active July 17, 2023 07:40
Show Gist options
  • Save Radiergummi/fe14c4ed93c68f2928a6a275c8579404 to your computer and use it in GitHub Desktop.
Save Radiergummi/fe14c4ed93c68f2928a6a275c8579404 to your computer and use it in GitHub Desktop.
Docker Swarm Deployment
from logging import getLogger
from os import getenv
from runner.deployment import main
from runner.settings import initialize_logging, load_settings
initialize_logging(getenv("DEBUG", "false") == "true")
logger = getLogger("runner")
try:
settings = load_settings()
logger.info("Starting deployment")
main(settings)
logger.info("Deployment complete")
except RuntimeError as error:
message = f"Deployment failed: {error}"
logger.error(message) # noqa: TRY400
raise SystemExit(1) from error

This is the deployment runner script we use in our BitBucket pipelines. It does the following things, in this order:

  1. Sign in to Docker Hub
    So we can download images and propagate authentication to Swarm nodes. This connects to the connection in DOCKER_HOST (defaulting to the Docker socket), so you can also run this within a deployment container - see below for more.
  2. Expand secrets and configs
    To configure services, we can use configs and secrets to let the swarm engine mount those values securely in service containers.
    This will scan the compose spec for secrets and configs, search for adjacently named environment variables or text files, expands variables into text files, and modifies the compose file to use those values.
  3. Reconcile the schema
    Annoyingly, Docker Compose and Docker Stack use a different compose file parser (see here for details). This leads to some subtle incompatibilities in the schema that prevent deploying stacks that are otherwise valid compose specs. This step fixes those incompatibilities.
  4. Deploy the stack
    This simply wraps the docker stack deploy command with the correct settings and environment, and parses the output to check for errors.
  5. Prune secrets and configs
    Secrets and configs can be created, but never updated in Docker. This means that you'll need to use unique names in each deployment, or it will fail. In step 2, the runner has generated secret names (configs too, but I'll refer to just secrets for simplicity) from the hash of the secret's content. This means only if your secrets change will a new one created in the Swarm. In this step, we're purging outdated (that is, unused) secrets from the Swarm.
  6. Log out
    Cleaning up, we'll log out of Docker Hub to prevent any credentials from being persisted for too long.

Variable interpolation

When reconciling the compose spec, docker will interpolate environment variable references in the file. This means that you can use variables in values, like image: nginx:${VERSION:-1.4}. We make extensive use of that to have deployment-specific values in the stack. Namely, we use the following variables:

  • SERVICE: The name of the stack being deployed. This is also used to assign secrets and configs to individual stacks, and purge them later. Default to the repository slug.
  • VERSION: The currently deployed revision, defaulting to the build number.
  • IMAGE_TAG: The main tag for the Docker image of the application we're creating a deployment for. This defaults to the short git hash. We don't actually use this in the runner, but pass it along to the spec interpolation step.

There are some more configuration variables and parameters, see settings.py.

References to "Matchory"

This file has been created by Matchory GmbH, and some of the labels used refer to our company name. You can change those references if you like, but they have no influence on functionality.

References to BitBucket

This script was written for deployments from BitBucket pipelines. Therefore, it makes some assumptions on default variables in the environment -- namely, the workspace, repository name, deployment ID, and so on. Unless you're using BitBucket, you'll probably want to replace those variables with equivalents for your CI.

from argparse import Namespace
from json import dumps as dump_json
from json import loads as parse_json
from logging import getLogger
from os import environ
from pathlib import Path
from subprocess import PIPE, CalledProcessError, check_output, run
from typing import Literal, TypedDict
from docker import DockerClient
from docker.errors import APIError
logger = getLogger(__name__)
docker_logger = logger.getChild("docker")
class Variable(TypedDict):
name: str | None
file: str | None
environment: str | None
external: bool
labels: dict[str, str] | None
class _ComposeSpecSecret(Variable):
pass
class _ComposeSpecConfig(Variable):
pass
class _ComposeSpecServicePort(TypedDict):
mode: Literal["ingress"] | Literal["host"]
host_ip: str
target: int | str
published: str
protocol: Literal["tcp"] | Literal["udp"]
class _ComposeSpecService(TypedDict):
ports: list[_ComposeSpecServicePort] | None
entrypoint: str | list[str] | None
command: str | list[str] | None
class ComposeSpec(TypedDict):
name: str | None
version: str | None
services: dict[str, _ComposeSpecService]
secrets: dict[str, _ComposeSpecSecret] | None
configs: dict[str, _ComposeSpecConfig] | None
def resolve_compose_file(settings: Namespace) -> str | None:
variants = (
"docker-compose.production.yaml",
"docker-compose.production.yml",
"docker-compose.prod.yaml",
"docker-compose.prod.yml",
"docker-compose.yaml",
"docker-compose.yml",
)
cwd = Path.cwd()
for variant in variants:
logger.debug("Testing for %s...", variant)
if Path(cwd, variant).exists():
logger.debug("Found variant %s", variant)
return variant
return None
def normalize_spec(settings: Namespace) -> ComposeSpec:
"""
Normalize a compose specification by running it through the docker-compose binary.
This will process an arbitrary number of compose files, merge them, and
normalize all values into a single specification. We use this to build a
representation of the service which is normalized into a known schema we can
apply transformations to.
Additionally, it interpolates environment variables, which is why we pull
the variables from the settings into the environment.
:param settings: Application settings.
:return: Compose specification.
"""
try:
result = check_output(
["docker-compose", "config", "--format", "json"],
env={
**environ,
"DOCKER_HOST": settings.docker_host,
"SERVICE": settings.service,
"VERSION": settings.version,
"IMAGE_TAG": settings.image_tag,
"DEPLOY_IMAGE": settings.image_tag,
"COMPOSE_FILE": settings.compose_file,
},
stderr=PIPE,
)
except CalledProcessError as error:
message = (
f"Failed to normalize compose specification: {error.stderr.decode('utf-8')}"
)
raise RuntimeError(message) from error
return parse_json(result)
def stack_deploy(settings: Namespace, spec: ComposeSpec) -> None:
"""
Deploy a Docker stack to the swarm.
This wraps the command-line `docker stack deploy` command, which has no SDK
function available.
:param settings: Application settings.
:param spec: Compose specification to deploy.
:return: None.
"""
# We convert the spec to JSON: Luckily, swarm deploy accepts JSON just fine,
# si there's no need to work with YAML here. Indentation is just for ease of
# readability if the spec is printed during debugging.
spec_json = dump_json(spec, indent=4)
logger.debug("Deploying stack from generated compose file:\n%s", spec_json)
process = run(
[
"docker",
"stack",
"deploy",
"--compose-file",
"-",
"--orchestrator",
"swarm",
"--resolve-image",
"always",
"--with-registry-auth",
"--prune",
settings.service,
],
input=spec_json.encode("utf-8"),
capture_output=True,
shell=False,
env={
**environ,
"DOCKER_HOST": settings.docker_host,
"COMPOSE_FILE": settings.compose_file,
},
)
for line in process.stdout.decode("utf-8").split("\n"):
if line:
docker_logger.info(line)
if process.stderr:
for line in process.stderr.decode("utf-8").split("\n"):
if line:
docker_logger.error(line)
if process.returncode > 0:
message = "Stack deployment failed"
raise RuntimeError(message)
def reconcile_schema(spec: ComposeSpec) -> ComposeSpec:
"""
Reconcile the compose specification schema of Compose and Swarm.
As Swarm doesn't use the modern compose specification implementation but
rolls its own, it isn't completely compatible to what compose does. As
brain-dead as this is, considering both are developed by the same company,
it's what we have.
Thus, this function attempts to reconcile all differences and convert a
valid compose schema to a valid Swarm schema.
:param spec: Compose Specification to convert
:return: Swarm-compatible compose specification
"""
# Add Compose Version. This isn't present in newer compose schema, but
# required by Docker Swarm, so we set it to the newest supported version.
if "version" not in spec:
spec["version"] = "3.9"
# Remove any name key. Swarm doesn't support it.
if "name" in spec:
# TODO: Convert the `name` property to the NotRequired type once Python
# 3.11 is available on the Temurin Docker image.
# noinspection PyTypedDict
del spec["name"]
# Swarm doesn't accept ports as integers for some reason, so we cast them
# to strings
for _key, service in spec["services"].items():
# TODO: This appears to have been fixed in more recent Docker versions
if "ports" in service:
for port in service["ports"]:
port["target"] = int(port["target"])
port["published"] = int(port["published"])
if "entrypoint" in service and service["entrypoint"] is None:
# noinspection PyTypedDict
del service["entrypoint"]
if "command" in service and service["command"] is None:
# noinspection PyTypedDict
del service["command"]
return spec
def login(client: DockerClient, settings: Namespace) -> None:
"""
Login to the Docker registry.
This will authenticate with the Docker registry using the credentials provided
in the settings. If no credentials are provided, login will be skipped.
:param client: Docker client.
:param settings: Application settings.
:return: None.
"""
if not settings.docker_hub_credentials:
logger.debug("No Docker registry credentials provided: Skipping login")
return
username: str
password: str
(username, password) = settings.docker_hub_credentials
registry = settings.docker_registry
if not registry:
registry = client.info().get("IndexServerAddress")
_client_login(client, username, password, registry)
_shell_login(settings, username, password, registry)
def logout(client: DockerClient, settings: Namespace) -> None:
"""
Logout from the Docker registry.
This will remove any stored credentials for the Docker registry. If no
credentials are provided, logout will be skipped.
:param client: Docker client.
:param settings: Application settings.
:return: None.
"""
if not settings.docker_hub_credentials:
logger.debug("No Docker registry credentials provided: Skipping login")
return
registry = settings.docker_registry
if not registry:
registry = client.info().get("IndexServerAddress")
_shell_logout(settings, registry)
def _client_login(
client: DockerClient,
username: str,
password: str,
registry: str,
) -> None:
try:
response = client.login(
username,
password,
registry=registry,
reauth=True,
)
except APIError as error:
if error.status_code == 401:
message = (
f"Failed to log into Docker registry at {registry}: "
+ error.explanation
)
raise RuntimeError(message) from error
raise
if "Status" not in response:
message = "Unexpected response from Docker daemon: " + repr(response)
raise RuntimeError(message)
logger.info(
"Logged into Docker registry at %s as %s: %s",
registry,
username,
response["Status"],
)
def _shell_login(
settings: Namespace,
username: str,
password: str,
registry: str,
) -> None:
process = run(
[
"docker",
"login",
"--password-stdin",
"--username",
username,
registry,
],
input=password.encode("utf-8"),
capture_output=True,
shell=False,
timeout=float(settings.docker_timeout),
env={
**environ,
"DOCKER_HOST": settings.docker_host,
},
)
for line in process.stdout.decode("utf-8").split("\n"):
if line:
docker_logger.info(line)
if process.stderr:
for line in process.stderr.decode("utf-8").split("\n"):
if line:
docker_logger.error(line)
if process.returncode > 0:
message = "Login failed"
raise RuntimeError(message)
def _shell_logout(
settings: Namespace,
registry: str,
) -> None:
process = run(
[
"docker",
"logout",
registry,
],
capture_output=True,
shell=False,
timeout=float(settings.docker_timeout),
env={
**environ,
"DOCKER_HOST": settings.docker_host,
},
)
for line in process.stdout.decode("utf-8").split("\n"):
if line:
docker_logger.info(line)
if process.stderr:
for line in process.stderr.decode("utf-8").split("\n"):
if line:
docker_logger.error(line)
if process.returncode > 0:
message = "Logout failed"
raise RuntimeError(message)
#!/usr/bin/env sh
# Wrapper script for the Python runner
python -m runner $@
from argparse import Namespace
from collections.abc import Iterable
from datetime import datetime, timedelta
from hashlib import sha256
from logging import Logger, getLogger
from os import getenv
from pathlib import Path
from re import sub
from docker import DockerClient
from docker.errors import APIError
from pytz import UTC
from runner.compose import (
ComposeSpec,
Variable,
login,
logout,
normalize_spec,
reconcile_schema,
stack_deploy,
)
logger = getLogger(__name__)
def main(settings: Namespace):
runner_version = getenv("__DEPLOYMENT_RUNNER_VERSION", "1.0.0")
client = DockerClient(
base_url=settings.docker_host,
version=settings.docker_api_version or "auto",
user_agent=f"matchory-deployment-runner/{runner_version} (linux-shell)",
)
login(client, settings)
compose_spec = normalize_spec(settings)
secrets = compose_spec.get("secrets") or {}
for name, entry in secrets.items():
secrets[name] = _process_variable(name, entry, settings)
configs = compose_spec.get("configs") or {}
for name, entry in configs.items():
configs[name] = _process_variable(name, entry, settings)
compose_spec = reconcile_schema(compose_spec)
stack_deploy(settings, compose_spec)
prune_variables(compose_spec, client, settings)
logout(client, settings)
def _process_variable(name: str, variable: Variable, settings: Namespace) -> Variable:
"""
Process a variable.
:param name: Name of the variable.
:param variable: Variable metadata as defined in the compose spec.
:param settings: Application settings.
:return: Modified variable metadata.
"""
variant_upper = name.upper()
variant_prefix = f"{settings.env_var_prefix}_{variant_upper}"
env_variable = (
getenv(variable["environment"])
if "environment" in variable
else getenv(variant_upper, getenv(variant_prefix))
)
path = Path(variable["file"]) if "file" in variable else None
# Make sure we retain the ability to add secrets and configs from local files, not
# just environment variables. If the file specified in the variable exists, we
# assume it takes precedence over any environment variable with the same name.
if path:
if not path.exists():
logger.debug("Expanding %s to %s", name, path)
if env_variable is None:
raise VariableNotDefinedError(name, (variant_upper, variant_prefix))
path.write_text(env_variable.strip(), encoding="utf-8")
else:
logger.info(
"Skipping secret expansion for %s: Existing file takes precedence "
"over variable defined in environment",
name,
)
else:
if not env_variable:
raise VariableNotDefinedError(name, (variable["environment"]))
# Calculate the hash of the variable file: It will stay the same for subsequent
# deployments if the actual value didn't change between them, which means we don't
# have to invalidate a secret or config pointlessly.
payload_hash = sha256(
path.read_bytes() if path else env_variable.encode("utf-8"),
).hexdigest()[:7]
if "labels" not in variable:
variable["labels"] = {}
# By storing these values as labels, we can query for matching variables later on:
# This enables us to prune outdated versions automatically.
variable["labels"]["com.matchory.service"] = settings.service
variable["labels"]["com.matchory.version"] = settings.version
variable["labels"]["com.matchory.hash"] = payload_hash
# Again, compatibility with legacy deployment runner versions
if variable["name"].startswith(settings.service):
variable["name"] = sub(
pattern=rf"^{settings.service}[_-]",
repl="",
string=variable["name"],
)
if variable["name"].endswith(settings.version):
variable["name"] = sub(
pattern=rf"[_-]{settings.version}$",
repl="",
string=variable["name"],
)
variable["name"] = f"{settings.service}-{variable['name']}-{payload_hash}"
return variable
def prune_variables(
compose_spec: ComposeSpec,
client: DockerClient,
settings: Namespace,
):
prune_logger = logger.getChild("pruning")
_prune_secrets(compose_spec, client, prune_logger, settings)
_prune_configs(compose_spec, client, prune_logger, settings)
def _prune_secrets(
compose_spec: ComposeSpec,
client: DockerClient,
prune_logger: Logger,
settings: Namespace,
):
prune_logger.debug("Pruning secrets for service %s", settings.service)
spec_secrets = (
[secret["name"] for secret in compose_spec["secrets"].values()]
if "secrets" in compose_spec
else []
)
secrets = client.secrets.list(
filters={
"label": f"com.matchory.service={settings.service}",
},
)
if len(secrets) > 0:
prune_logger.debug(
"Checking %d secret(s) for service %s",
len(secrets),
settings.service,
)
for i, secret in enumerate(secrets):
prune_logger.debug(
"Checking secret %d/%d: %s",
i + 1,
len(secrets),
secret.name,
)
if "com.matchory.hash" not in secret.attrs["Spec"]["Labels"]:
prune_logger.warning(
"Found invalid secret '%s': Missing hash label",
secret.name,
)
secret.remove()
continue
spec_name = sub(
pattern=rf"^{settings.service}[_-](.+)[_-].[^_-]+$",
repl=r"\1",
string=secret.name,
)
if secret.name not in spec_secrets:
secret_hash = secret.attrs["Spec"]["Labels"]["com.matchory.hash"]
prune_logger.debug(
"Pruning outdated version %s of secret %s: %s",
secret_hash,
spec_name,
secret.name,
)
secret.remove()
created_at = parse_date_string(secret.attrs["CreatedAt"])
delta = timedelta(days=30)
if created_at < datetime.now(tz=UTC) - delta:
prune_logger.warning(
"Secret '%s' has been in use for more than 30 days and should "
"be rotated!",
spec_name,
)
secret_names = [secret.name for secret in secrets]
legacy_secrets = [
secret
for secret in client.secrets.list(
filters={
"name": settings.service,
},
)
if secret.name not in secret_names
]
if len(legacy_secrets) > 0:
prune_logger.debug(
"Pruning %d legacy secret(s) for service %s",
len(legacy_secrets),
settings.service,
)
for secret in legacy_secrets:
prune_logger.info("Pruning legacy secret %s", secret.name)
try:
secret.remove()
except APIError:
prune_logger.exception(
"Failed to prune legacy secret %s",
secret.name,
)
def _prune_configs(
compose_spec: ComposeSpec,
client: DockerClient,
prune_logger: Logger,
settings: Namespace,
):
prune_logger.debug("Pruning configs for service %s", settings.service)
spec_configs = (
[config["name"] for config in compose_spec["configs"].values()]
if "configs" in compose_spec
else []
)
configs = client.configs.list(
filters={
"label": f"com.matchory.service={settings.service}",
},
)
if len(configs) > 0:
prune_logger.debug(
"Checking %d config(s) for service %s",
len(configs),
settings.service,
)
for i, config in enumerate(configs):
prune_logger.debug(
"Checking config %d/%d: %s",
i + 1,
len(configs),
config.name,
)
if "com.matchory.hash" not in config.attrs["Spec"]["Labels"]:
prune_logger.warning(
"Found invalid config '%s': Missing hash label",
config.name,
)
config.remove()
continue
spec_name = sub(
pattern=rf"^{settings.service}[_-](.+)[_-].[^_-]+$",
repl=r"\1",
string=config.name,
)
if config.name not in spec_configs:
config_hash = config.attrs["Spec"]["Labels"]["com.matchory.hash"]
prune_logger.debug(
"Pruning outdated version %s of config %s: %s",
config_hash,
spec_name,
config.name,
)
config.remove()
created_at = parse_date_string(config.attrs["CreatedAt"])
delta = timedelta(days=30)
if created_at < datetime.now(tz=UTC) - delta:
prune_logger.warning(
"Secret '%s' has been in use for more than 30 days and should "
"be rotated!",
spec_name,
)
config_names = [config.name for config in configs]
legacy_configs = [
config
for config in client.configs.list(
filters={
"name": settings.service,
},
)
if config.name not in config_names
]
if len(legacy_configs) > 0:
prune_logger.debug(
"Pruning %d legacy config(s) for service %s",
len(legacy_configs),
settings.service,
)
for config in legacy_configs:
prune_logger.info("Pruning legacy config %s", config.name)
try:
config.remove()
except APIError:
prune_logger.exception(
"Failed to prune legacy config %s",
config.name,
)
def parse_date_string(date_string: str) -> datetime:
"""
Parse a date string provided by Docker Swarm.
:param date_string: Date string to parse.
:return: Parsed datetime.
"""
date, _, microseconds = date_string.partition(".")
date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S").astimezone(tz=UTC)
microseconds = int(microseconds.rstrip("Z"), 10)
return date + timedelta(microseconds=microseconds)
class VariableNotDefinedError(RuntimeError):
variants: Iterable[str]
name: str
def __init__(self, name: str, variants: Iterable[str]):
self.variants = variants
self.name = name
def __str__(self):
"""
Retrieve the error message.
:return:
"""
return (
f"Variable '{self.name}' is undefined: {self._variants} in the build "
"environment. A deployment variable must be declared in the repository "
"variables, deployment environment, or workspace variables. Consult the "
"following reference for detailed instructions: "
"https://support.atlassian.com/bitbucket-cloud/docs/variables-and-secrets"
)
@property
def _variants(self):
variants = list(self.variants)
amount = len(variants)
if amount == 1:
name = variants[0]
return f"'{name}' is not defined"
if amount == 2:
(first, second) = variants
return f"Neither '{first}' nor '{second}' are defined"
if amount > 2:
names = "', '".join(variants)
return f"None of '{names}' are defined"
return "No suitable variants are defined"
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
from logging import config, getLogger
from os import getenv
from pathlib import Path
from git import InvalidGitRepositoryError, Repo
from runner.compose import resolve_compose_file
logger = getLogger(__name__)
def initialize_logging(debug: bool = False) -> None:
config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"console": {
"format": "%(name)s\t[%(levelname)s]\t%(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "console",
},
},
"root": {
"handlers": ["console"],
"level": str(getenv("LOG_LEVEL", str("DEBUG" if debug else "INFO"))),
},
"loggers": {
"main": {
"handlers": ["console"],
"level": str(
getenv("LOG_LEVEL", str("DEBUG" if debug else "INFO")),
),
"propagate": True,
},
"urllib3": {
"handlers": [],
"level": "WARNING",
"propagate": False,
},
},
},
)
def load_settings() -> Namespace:
parser = ArgumentParser()
parser.prog = "deploy"
parser.allow_abbrev = True
parser.formatter_class = ArgumentDefaultsHelpFormatter
parser.description = "Docker Swarm cluster deployment runner"
parser.epilog = (
"The deployment runner should be configured using environment variables, if\n"
"possible: It makes lots of assumptions about the source and target systems,\n"
"so unless you have a very specific problem at hand which requires specific\n"
"configuration, it will probably do its job just fine without any fancy\n"
"customization of yours :-)"
)
parser.add_argument(
"service",
nargs="?",
default=getenv("SERVICE"),
help="Name of the stack to deploy",
)
parser.add_argument(
"version",
nargs="?",
default=getenv("VERSION"),
help="Deployment revision of the stack",
)
parser.add_argument(
"image_tag",
nargs="?",
default=getenv("IMAGE_TAG"),
help="Docker image tag to run",
)
parser.add_argument(
"compose_file",
nargs="?",
default=getenv("COMPOSE_FILE"),
help="Compose file(s) to deploy from",
)
parser.add_argument(
"env_var_prefix",
nargs="?",
default=getenv("ENV_VAR_PREFIX"),
help="Prefix for configuration variables",
)
parser.add_argument(
"--environment",
"-E",
default=getenv("ENVIRONMENT"),
help="Name of the deployment environment",
)
parser.add_argument(
"--docker-host",
"-H",
default=getenv("CUSTOM_DOCKER_HOST"),
help="Alternative Docker host (you probably don't need this)",
)
parser.add_argument(
"--docker-timeout",
"-T",
default=getenv("CUSTOM_DOCKER_TIMEOUT"),
help="Docker API timeout",
)
parser.add_argument(
"--docker-api-version",
"-V",
default=getenv("DOCKER_API_VERSION"),
help="Docker API SDK version",
)
parser.add_argument(
"--docker-hub-credentials",
"-C",
default=(
getenv("DOCKER_HUB_USER", getenv("DOCKER_HUB_USERNAME", ""))
+ ":"
+ getenv("DOCKER_HUB_PASS", getenv("DOCKER_HUB_PASSWORD", ""))
).strip(":")
or None,
help="Docker registry credentials, supplied as user:pass",
)
parser.add_argument(
"--docker-registry",
"-R",
default=getenv("DOCKER_REGISTRY"),
help="Address of the Docker registry to use",
)
parser.add_argument(
"--bitbucket-repository-name",
default=getenv("BITBUCKET_REPO_FULL_NAME"),
help="Full name of the repository, supplied as org/repo",
)
parser.add_argument(
"--bitbucket-pipeline-id",
default=getenv("BITBUCKET_PIPELINE_UUID"),
help="UUID of the BitBucket pipeline",
)
parser.add_argument(
"--bitbucket-workspace",
default=getenv("BITBUCKET_WORKSPACE"),
help="Name of the BitBucket workspace",
)
parser.add_argument(
"--bitbucket-environment-id",
default=getenv("BITBUCKET_DEPLOYMENT_ENVIRONMENT_UUID"),
help="UUID of the BitBucket deployment environment",
)
args = parser.parse_args()
args = _load_service(args)
args = _load_version(args)
args = _load_image_tag(args)
args = _load_environment_variable_prefix(args)
args = _load_compose_file(args)
args = _load_docker_host(args)
args = _load_docker_timeout(args)
args = _load_docker_api_version(args)
args = _load_docker_hub_credentials(args)
logger.debug("Validated input.")
return args
def _load_service(args: Namespace) -> Namespace:
if not args.service:
args.service = getenv("BITBUCKET_REPO_SLUG", "unknown")
logger.info(
"No service name specified, using $BITBUCKET_REPO_SLUG (%s).",
args.service,
)
if args.environment:
suffix = f"-{args.environment}"
if not args.service.endswith(suffix):
args.service = args.service + suffix
logger.debug(
"Appending environment to service name (%s).",
args.service,
)
logger.debug("Using service name %s.", args.service)
return args
def _load_version(args: Namespace) -> Namespace:
if not args.version:
args.version = getenv("BITBUCKET_BUILD_NUMBER", "1")
logger.info(
"No version specified, using $BITBUCKET_BUILD_NUMBER (%s).",
args.version,
)
logger.debug("Using version %s.", args.version)
return args
def _load_image_tag(args: Namespace) -> Namespace:
if not args.image_tag:
try:
repo = Repo(Path.cwd(), search_parent_directories=True)
args.image_tag = repo.git.rev_parse(repo.head.object.hexsha, short=7)
except InvalidGitRepositoryError as error:
message = "Invalid git repository: " + str(error)
raise RuntimeError(message) from error
logger.info(
"No image tag specified, using using git hash (%s).",
args.image_tag,
)
logger.debug("Using image tag %s.", args.image_tag)
return args
def _load_environment_variable_prefix(args: Namespace) -> Namespace:
if not args.env_var_prefix:
args.env_var_prefix = "APP"
logger.info(
"No env var prefix specified, using '%s'.",
args.env_var_prefix,
)
logger.debug("Using env var prefix %s.", args.env_var_prefix)
return args
def _load_compose_file(args: Namespace) -> Namespace:
if not args.compose_file:
args.compose_file = resolve_compose_file(args)
if not args.compose_file:
message = "No compose file specified: Aborting deployment"
raise RuntimeError(message)
separator = getenv("COMPOSE_FILE_SEPARATOR", ":")
logger.debug(
"Using compose file(s) %s.",
", ".join(args.compose_file.split(separator)),
)
return args
def _load_docker_host(args: Namespace) -> Namespace:
if not args.docker_host:
args.docker_host = "unix:///var/run/docker.sock"
return args
logger.info("Using custom Docker host %s.", args.docker_host)
return args
def _load_docker_timeout(args: Namespace) -> Namespace:
if not args.docker_timeout:
args.docker_timeout = 10
return args
logger.info("Using custom Docker timeout %ds.", args.docker_timeout)
return args
def _load_docker_api_version(args: Namespace) -> Namespace:
if args.docker_api_version:
logger.info("Using custom Docker API version %s.", args.docker_api_version)
return args
def _load_docker_hub_credentials(args: Namespace) -> Namespace:
if not args.docker_hub_credentials or args.docker_hub_credentials == ":":
logger.info("No Docker Hub credentials provided.")
args.docker_hub_credentials = None
else:
(username, password) = args.docker_hub_credentials.split(":", 2)
args.docker_hub_credentials = (username, password)
if args.docker_registry:
logger.info("Using custom Docker registry at %s.", args.docker_registry)
return args
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment