Skip to content

Instantly share code, notes, and snippets.

@stigok
Created January 12, 2024 10:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stigok/9eca08a55f1105f336f0161e2a0fb37e to your computer and use it in GitHub Desktop.
Save stigok/9eca08a55f1105f336f0161e2a0fb37e to your computer and use it in GitHub Desktop.
Filter Terraform plan results
#!/usr/bin/env python3
"""
Helper to generate `-target='<resource address>'` arguments to terraform operations
from Terraform output. Operates on text only and does not touch Terraform itself.
Writes output to stdout.
"""
import argparse
import hashlib
import io
import json
import logging
import os.path
import re
import subprocess
import sys
import tempfile
CACHE_FILE_PREFIX = os.path.join(tempfile.gettempdir(), ".tfilter_cache_file_")
def plan(*, input_file):
digest = hashlib.file_digest(input_file, "sha256").hexdigest()
cachepath = CACHE_FILE_PREFIX + digest
if os.path.exists(cachepath):
logging.info("using cached plan")
try:
with open(cachepath) as f:
return json.load(f)
except json.decoder.JSONDecodeError:
logging.exception("failed to parse cache file")
logging.debug("no cached entry found -- running `terraform show`")
plan = subprocess.run(
["terraform", "show", input_file.name, "-no-color"],
capture_output=True,
check=True,
text=True,
).stdout
with open(cachepath, "w") as f:
logging.debug("writing cache file to %s", cachepath)
json.dump(plan, f)
return plan
def parse(
*,
input_file,
name_filter=None,
re_filter=None,
ex_re_filter=None,
input_format=None,
output_format="target",
out=sys.stdout,
):
"""
The `output` arg can either be 'target', where it outputs `-target='address'`, or 'addresses' where
it outputs the plain resource addresses newline separated.
"""
targets = []
if input_format == "address":
# If each line is already an address, no parsing is needed.
for line in input_file.read().decode().splitlines():
if line := line.strip():
targets.append(line)
continue
else:
plan_str = plan(input_file=input_file)
# Parse output from `terraform plan` to extract module names.
for line in plan_str.splitlines():
# Start over again if this line appears. Terraform plan will also print out resource
# drift, which is rarely interesting.
if "Terraform will perform the following actions:" in line:
targets.clear()
continue
if re.match(r"^\s+#.+(will|must) be \w+", line):
logging.debug("match: %s", line)
line = line.strip()
# Remove all text but the actual resource address
line = re.sub("^#\s*", "", line)
line = re.sub("\s*(will|must) be .+$", "", line)
if " " not in line: # prevents catching '# Warning: ' lines
targets.append(line)
logging.debug("targets before filter: %s", targets)
if re_filter:
pat = re.compile(re_filter, flags=re.IGNORECASE)
targets = [t for t in targets if pat.search(t)]
elif name_filter:
targets = [t for t in targets if name_filter in t]
logging.debug("targets before exclude: %s", targets)
if ex_re_filter:
pat = re.compile(ex_re_filter, flags=re.IGNORECASE)
targets = [t for t in targets if not pat.search(t)]
if output_format == "target":
print(" ".join((f"-target='{t}'" for t in set(targets))), file=out, end="")
print(file=out)
elif output_format == "address":
for t in set(targets):
print(t, file=out)
print(file=out)
else:
raise NotImplementedError(f"output {output_format} is not implemented")
def test():
tests = {
" # module.foo.bar will be updated": "-target='module.foo.bar'",
" # module.foo.bar must be replaced": "-target='module.foo.bar'",
" # module.foo.bar will be updated": "-target='module.foo.bar'",
" # module.foo.bar must be replaced": "-target='module.foo.bar'",
" # module.nrk_google_ne_prod.module.common.null_resource.ingress_controller_wildcard_cert[0] will be updated": "-target='module.nrk_google_ne_prod.module.common.null_resource.ingress_controller_wildcard_cert[0]'",
}
for s, expected in tests.items():
inp = io.StringIO(s + "\n")
out = io.StringIO()
parse(input_file=inp, out=out)
actual = out.getvalue()
assert actual == expected, "'%s' != '%s'" % (actual, expected)
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"-f",
"--name-filter",
help="a string to filter out targets with a matching substring",
)
parser.add_argument(
"-r",
"--re-filter",
help="a regular expression to return only targets that matches (overrides --name-filter)",
)
parser.add_argument(
"-e",
"--ex-re-filter",
help="a regular expression to return only targets that DOES NOT match (applied after all other filters)",
)
parser.add_argument(
"-i",
"--input-format",
choices=("plan", "address"),
default="plan",
help="format of the input. 'plan' is the output from 'terraform plan -no-color', while address is the output of 'terraform state list'.",
)
parser.add_argument(
"-o",
"--output-format",
choices=("target", "address"),
default="target",
help="output format of the addresses. 'targets' outputs like space separated '-target=address' items, while addresses prints just newline separated addresses.",
)
parser.add_argument(
"input_file",
type=argparse.FileType("r+b"),
nargs="?",
help="a Terraform binary plan file",
default="out.plan",
)
parser.add_argument(
"--test",
action="store_true",
help="run tests",
)
parser.add_argument(
"--debug",
action="store_true",
help="enable debug logging",
)
args = vars(parser.parse_args())
if args.pop("debug"):
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
if args.pop("test"):
print("Running tests")
test()
return
logging.debug("Calling with args: %s", args)
parse(**args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment