Skip to content

Instantly share code, notes, and snippets.

@bneutra
Last active July 28, 2023 23:50
Show Gist options
  • Save bneutra/2b8907e0f3956e91e31abcc851624ef1 to your computer and use it in GitHub Desktop.
Save bneutra/2b8907e0f3956e91e31abcc851624ef1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import concurrent.futures
import logging
import os
import sys
import time
from collections import deque
import click
from dictdiffer import diff
CURRENT_USER = os.getlogin()
INFRA_ROOT = "/Users/brendan.neutra/infra"
sys.path.append(INFRA_ROOT)
from scripts.tfc_util import TerraformCloudApiClient, load_token # noqa: E402
INFRA_REPO = "https://github.com/benchling/infra"
@click.group()
def main() -> None:
pass
def plan_differ(before, after):
result = diff(before, after)
diff_list = list(result)
return output_diff(diff_list)
def output_diff(diff):
response = ""
for item in diff:
# TODO: dictdiffer has an irregular way of representing diffs
# i.e. path could be a string (simple key) or a list (nested path)
# change could be a tuple(str) (simple string change) or
# tuple(list), tuple(list(other)) for more complex changes
# we should build our own differ, use another tool or process
# the dictdiffer diff to be more tf plan like (i.e. human readable)
attr: str = str(item[1])
change: str = str(item[2])
if attr:
response += f"attribute: {attr}\n"
if len(change) > 5000:
change = change[:5000] + "... truncated"
response += f"{change}\n"
return response
def process_plan_response(plan_json, terse):
resources=plan_json.get('resource_changes')
updates=[item for item in resources if 'update' in item['change']['actions']]
deletes=[item for item in resources if 'delete' in item['change']['actions']]
creates=[item for item in resources if 'create' in item['change']['actions']]
if not updates and not deletes and not creates:
return False
for change in creates:
diff = plan_differ(change["change"]["before"], change["change"]["after"])
address = change['address']
click.secho(f"CREATE: ", fg='green', nl=False)
click.echo(address)
if not terse:
click.secho(f"{diff}", fg='green')
for change in updates:
diff = plan_differ(change["change"]["before"], change["change"]["after"])
address = change['address']
click.secho(f"UPDATE: ", fg='yellow', nl=False)
click.echo(address)
if not terse:
click.secho(f"{diff}", fg='yellow')
for change in deletes:
diff = plan_differ(change["change"]["before"], change["change"]["after"])
address = change['address']
click.secho(f"DELETE: ", fg='red', nl=False)
click.echo(address)
if not terse:
click.secho(f"{diff}", fg='red')
return True
def get_latest_runs(client, branch, pr, tag_filters, dry_run):
workspaces_data = client.list_workspaces(tag_filters=tag_filters)
workspaces = []
skipped = 0
for ws in workspaces_data:
skip_msg = f"Skipping {ws.name}, {ws.current_run_id}"
if ws.execution_mode != "agent":
logging.debug(f"{skip_msg} ws mode is not 'agent'")
skipped +=1
continue
if ws.vcs_branch != branch:
logging.debug(f"{skip_msg} ws branch is not {branch}")
skipped += 1
continue
if ws.vcs_repo != INFRA_REPO:
skipped += 1
logging.debug(f"{skip_msg} repo is not {INFRA_REPO}")
continue
run_info = client.get_run(ws.current_run_id)
this_commit_msg: str = run_info.message.split("\n")[0]
if pr and f"(#{pr})" not in this_commit_msg:
print(f"{skip_msg} PR not {pr}'")
continue
if run_info.status != "planned":
if not dry_run:
print(f"{skip_msg} status is '{run_info.status}' not 'planned'")
continue
if ws.current_run_id != ws.latest_run_id:
print(f"INFO: current_run_id != latest_run_id")
workspaces.append(
{
"id": ws.id,
"name": ws.name,
"current_run": ws.current_run_id, # should we use latest_run_id? they seem the same
"commit_message": this_commit_msg
}
)
print(
f"{len(workspaces)} runs to apply!, {skipped} workspaces silently skipped "
"due to branch, repo, or agent_mode mismatch\n"
"Now retrieving plans in the background..."
)
return workspaces
def process_plan(run_info: list):
client: TerraformCloudApiClient = TerraformCloudApiClient(api_token=load_token())
return client.get_plan_in_json(run_id=run_info[1], plan_id=None)
def poll_for_completion(client, runs):
start_time = time.time()
deadline_s = 900
print(f"Checking {len(runs)} runs for completion... timeout is {deadline_s}s")
poll_s = 1
runq = deque(runs)
applied = 0
while time.time() < start_time + deadline_s and runq:
run = runq.pop()
response = client.get_run(run[1])
if response.status == "applied":
print(f"{run[0]} was applied!")
applied += 1
continue
elif response.status == "errored":
print(f"{run[0]} was errored!")
continue
runq.appendleft(run)
time.sleep(poll_s)
print(f"{applied} out of {len(runs)} runs were applied successfully")
@main.command()
@click.option("--pr", required=False, default="", help="PR number to filter runs by")
@click.option("--branch", required=False, default="main", help="infra branch to filter runs by, default main")
@click.option("--terse", is_flag=True, default=False, help="Output abbreviated plan changes")
@click.option("--dry-run", is_flag=True)
@click.argument("tags", nargs=-1, required=True)
def cli_main(pr, branch, terse, dry_run, tags):
"""Scan workspaces by TFC tags, interactively review plans, apply."""
if not pr and not branch:
sys.exit("You must provide one of either --pr or --branch")
client: TerraformCloudApiClient = TerraformCloudApiClient(api_token=load_token())
workspaces = get_latest_runs(client, branch, pr, tags, dry_run)
runs = [[ws.get("name"), ws.get("current_run")] for ws in workspaces]
applied = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_to_run = {executor.submit(process_plan, run_info): run_info for run_info in runs}
for future in concurrent.futures.as_completed(future_to_run):
run_info = future_to_run[future]
try:
plan_data = future.result()
ws_name = run_info[0]
run_id = run_info[1]
print("+" * 80)
click.secho(f"+++ {ws_name} +++", fg='green')
result = process_plan_response(plan_data, terse)
if not result:
# TFC sometimes wants a plan applied even if no changes exist
click.secho(f"WARNING: plan produced no changes, apply anyway?", fg="yellow")
if dry_run:
if click.confirm(f"--dry-run review only {ws_name}, next?"):
pass
continue
else:
if not click.confirm(f"Proceed with the above plan for {ws_name}?"):
click.echo(f"Skippping {ws_name}...")
continue
click.echo(f"Asynchronously applying {ws_name} run: {run_id}...")
client.apply_run(run_id, f"tfc_plan_util.py applied by {CURRENT_USER}")
applied.append((ws_name, run_id))
except Exception as exc:
print('%r generated an exception: %s' % (run_info, exc))
print(f"{len(applied)} of {len(runs)} plans were applied")
if applied:
poll_for_completion(client, applied)
if __name__ == "__main__":
cli_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment