Created
January 12, 2024 10:12
-
-
Save stigok/9eca08a55f1105f336f0161e2a0fb37e to your computer and use it in GitHub Desktop.
Filter Terraform plan results
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Helper to generate `-target='<resource address>'` arguments to terraform operations | |
from Terraform output. Operates on text only and does not touch Terraform itself. | |
Writes output to stdout. | |
""" | |
import argparse | |
import hashlib | |
import io | |
import json | |
import logging | |
import os.path | |
import re | |
import subprocess | |
import sys | |
import tempfile | |
CACHE_FILE_PREFIX = os.path.join(tempfile.gettempdir(), ".tfilter_cache_file_") | |
def plan(*, input_file): | |
digest = hashlib.file_digest(input_file, "sha256").hexdigest() | |
cachepath = CACHE_FILE_PREFIX + digest | |
if os.path.exists(cachepath): | |
logging.info("using cached plan") | |
try: | |
with open(cachepath) as f: | |
return json.load(f) | |
except json.decoder.JSONDecodeError: | |
logging.exception("failed to parse cache file") | |
logging.debug("no cached entry found -- running `terraform show`") | |
plan = subprocess.run( | |
["terraform", "show", input_file.name, "-no-color"], | |
capture_output=True, | |
check=True, | |
text=True, | |
).stdout | |
with open(cachepath, "w") as f: | |
logging.debug("writing cache file to %s", cachepath) | |
json.dump(plan, f) | |
return plan | |
def parse( | |
*, | |
input_file, | |
name_filter=None, | |
re_filter=None, | |
ex_re_filter=None, | |
input_format=None, | |
output_format="target", | |
out=sys.stdout, | |
): | |
""" | |
The `output` arg can either be 'target', where it outputs `-target='address'`, or 'addresses' where | |
it outputs the plain resource addresses newline separated. | |
""" | |
targets = [] | |
if input_format == "address": | |
# If each line is already an address, no parsing is needed. | |
for line in input_file.read().decode().splitlines(): | |
if line := line.strip(): | |
targets.append(line) | |
continue | |
else: | |
plan_str = plan(input_file=input_file) | |
# Parse output from `terraform plan` to extract module names. | |
for line in plan_str.splitlines(): | |
# Start over again if this line appears. Terraform plan will also print out resource | |
# drift, which is rarely interesting. | |
if "Terraform will perform the following actions:" in line: | |
targets.clear() | |
continue | |
if re.match(r"^\s+#.+(will|must) be \w+", line): | |
logging.debug("match: %s", line) | |
line = line.strip() | |
# Remove all text but the actual resource address | |
line = re.sub("^#\s*", "", line) | |
line = re.sub("\s*(will|must) be .+$", "", line) | |
if " " not in line: # prevents catching '# Warning: ' lines | |
targets.append(line) | |
logging.debug("targets before filter: %s", targets) | |
if re_filter: | |
pat = re.compile(re_filter, flags=re.IGNORECASE) | |
targets = [t for t in targets if pat.search(t)] | |
elif name_filter: | |
targets = [t for t in targets if name_filter in t] | |
logging.debug("targets before exclude: %s", targets) | |
if ex_re_filter: | |
pat = re.compile(ex_re_filter, flags=re.IGNORECASE) | |
targets = [t for t in targets if not pat.search(t)] | |
if output_format == "target": | |
print(" ".join((f"-target='{t}'" for t in set(targets))), file=out, end="") | |
print(file=out) | |
elif output_format == "address": | |
for t in set(targets): | |
print(t, file=out) | |
print(file=out) | |
else: | |
raise NotImplementedError(f"output {output_format} is not implemented") | |
def test(): | |
tests = { | |
" # module.foo.bar will be updated": "-target='module.foo.bar'", | |
" # module.foo.bar must be replaced": "-target='module.foo.bar'", | |
" # module.foo.bar will be updated": "-target='module.foo.bar'", | |
" # module.foo.bar must be replaced": "-target='module.foo.bar'", | |
" # module.nrk_google_ne_prod.module.common.null_resource.ingress_controller_wildcard_cert[0] will be updated": "-target='module.nrk_google_ne_prod.module.common.null_resource.ingress_controller_wildcard_cert[0]'", | |
} | |
for s, expected in tests.items(): | |
inp = io.StringIO(s + "\n") | |
out = io.StringIO() | |
parse(input_file=inp, out=out) | |
actual = out.getvalue() | |
assert actual == expected, "'%s' != '%s'" % (actual, expected) | |
def main(): | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument( | |
"-f", | |
"--name-filter", | |
help="a string to filter out targets with a matching substring", | |
) | |
parser.add_argument( | |
"-r", | |
"--re-filter", | |
help="a regular expression to return only targets that matches (overrides --name-filter)", | |
) | |
parser.add_argument( | |
"-e", | |
"--ex-re-filter", | |
help="a regular expression to return only targets that DOES NOT match (applied after all other filters)", | |
) | |
parser.add_argument( | |
"-i", | |
"--input-format", | |
choices=("plan", "address"), | |
default="plan", | |
help="format of the input. 'plan' is the output from 'terraform plan -no-color', while address is the output of 'terraform state list'.", | |
) | |
parser.add_argument( | |
"-o", | |
"--output-format", | |
choices=("target", "address"), | |
default="target", | |
help="output format of the addresses. 'targets' outputs like space separated '-target=address' items, while addresses prints just newline separated addresses.", | |
) | |
parser.add_argument( | |
"input_file", | |
type=argparse.FileType("r+b"), | |
nargs="?", | |
help="a Terraform binary plan file", | |
default="out.plan", | |
) | |
parser.add_argument( | |
"--test", | |
action="store_true", | |
help="run tests", | |
) | |
parser.add_argument( | |
"--debug", | |
action="store_true", | |
help="enable debug logging", | |
) | |
args = vars(parser.parse_args()) | |
if args.pop("debug"): | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
if args.pop("test"): | |
print("Running tests") | |
test() | |
return | |
logging.debug("Calling with args: %s", args) | |
parse(**args) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment