Skip to content

Instantly share code, notes, and snippets.

@jayalane
Created June 25, 2021 19:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jayalane/a71575dc06e6690efd40d3c0c5692fa8 to your computer and use it in GitHub Desktop.
Save jayalane/a71575dc06e6690efd40d3c0c5692fa8 to your computer and use it in GitHub Desktop.
"""parses terraform code and provides SAX-style callbacks to code that
can return values to change the terraform. I re-read
http://www.paulgraham.com/avg.html last week and had a conversation
about what capabilities the TF team is building. The ability to
change our code with code is a useful capability.
Paul G:
As long as our hypothetical Blub programmer is looking down the
power continuum, he knows he's looking down. Languages less
powerful than Blub are obviously less powerful, because they're
missing some feature he's used to. But when our hypothetical Blub
programmer looks in the other direction, up the power continuum,
he doesn't realize he's looking up. What he sees are merely weird
languages. He probably considers them about equivalent in power to
Blub, but with all this other hairy stuff thrown in as well. Blub
is good enough for him, because he thinks in Blub.
Python is powerful enough for us in IaC.
Done: terraform acl/bucket/encrypt.
modules from APIE not users.
remove versions from provider/version
ToDo: Remove aes256
Also: https://steve-yegge.blogspot.com/2007/06/rich-programmer-food.html
on why parsing is such a powerful tool in the toolbox.
Currently, this does an oswalk on the tree, pulling out all the .tf
files and then parses thru them line at a time. It's not doing
char by char parsing, and can probably be broken by things like
un-matched } in quoted text etc. (The per-line parsing will tend to be in the
thing calling tfparse, e.g. set => toset involved checking for ) but
not inside quotes.
But it's a relatively rigorous state machine sort of parser (line
by line not char by char), not just a bunch of regexps.
"""
import os
import os.path
import enum
import accounts
import remove_quoted
DEBUG=False
ID_TO_NAME = {} # ID_TO_NAME: Dict[str, str]
NAME_TO_ID = {}
def setup_maps():
global ID_TO_NAME
global NAME_TO_ID
seen = {}
for ac, _ in accounts.all_accounts_sorted():
name = ac['name']
is_seen = (name in seen)
if is_seen:
name = name + "2" # so far only dups not trups
else:
seen[name] = 1
print ("Cataloging", name)
ID_TO_NAME[ac['acct_id']] = name
NAME_TO_ID[name] = ac['acct_id']
def p(*args, **kwargs):
if DEBUG:
print(*args, **kwargs)
def file_walk(rootDir = '.', suffix="tf"):
if os.path.isfile(rootDir) and "." in rootDir and rootDir.split(".")[-1] == suffix:
yield rootDir
return
for dirName, _, fileList in os.walk(rootDir):
if ".terraform" in dirName:
continue
for fname in fileList:
if "." in fname and fname.split(".")[1] == suffix:
yield dirName + "/" + fname
class parseState(enum.Enum):
NONE = 1
TOP_LEVEL = 2
SUB_LEVEL = 3
SKIPPING_HEREDOC = 4
SKIPPING_MULTILINE_COMMENT = 5
class returnCodes(enum.Enum):
"""bit field"""
RC_UNCHANGED = 0
RC_REPLACE_LINE = 1
RC_NEW_FILE = 2
RC_REPLACE_SECTION = 4
def handle_file(start_path, cbs):
"""pylint says this has too many if statements etc. It's a long state machine
"""
print ("Walking", start_path)
for fn in file_walk(start_path):
cbs.file_cb(fn)
state = parseState.NONE
brace_depth = 0
here_marker = ''
skipping_state = parseState.NONE
change = False
thing_stack = []
with open(fn, 'r') as f:
with open(fn + '.new', 'w') as f_new:
pending_output = []
state = parseState.TOP_LEVEL
f_new_lines = []
for orig_line in f.readlines():
line = orig_line.strip('\n')
no_quotes_line = remove_quoted.remove_quoted(line)
p ("IN:", line)
p ("STATE:", state)
p ("PENDING OUT LEN", len(pending_output))
p ("BRACE DEPTH", brace_depth)
if (state == parseState.TOP_LEVEL and
brace_depth == 0 and pending_output):
p ("OUT TOP LEVEL".join(pending_output))
f_new_lines = f_new_lines + pending_output
pending_output = []
thing_stack = []
if skipping_state != parseState.NONE:
p ("SKIPPING STATE:", skipping_state)
line = " ".join([a for a in line.split(" ") if len(a) > 0]) # strips all multi-spaces to 1 space
if len(line.lstrip(" ")) > 0 and line.lstrip(" ")[0] == "#": # skip comments #
if pending_output:
pending_output.append(orig_line)
p ("PENDING OUT:", orig_line.strip('\n'))
else:
p ("OUT:", orig_line.strip('\n'))
f_new_lines.append(orig_line)
continue
if len(line.lstrip(" ")) > 0 and line.lstrip(" ")[0] == "//": # skip comments // also
if pending_output:
pending_output.append(orig_line)
p ("PENDING OUT:", orig_line.strip('\n'))
else:
p ("OUT:", orig_line.strip('\n'))
f_new_lines.append(orig_line)
continue
if len(line) == 0:
if pending_output:
pending_output.append(orig_line)
p ("PENDING OUT:", orig_line.strip('\n'))
else:
p ("OUT:", orig_line.strip('\n'))
f_new_lines.append(orig_line)
continue
if state != parseState.SKIPPING_HEREDOC and line.startswith("/*"): # block comments should parse at char level
skipping_state = state
state = parseState.SKIPPING_MULTILINE_COMMENT
if pending_output:
pending_output.append(orig_line)
else:
p ("OUT:", orig_line.strip('\n'))
f_new_lines.append(orig_line)
continue
if state == parseState.NONE:
raise Exception("Illegal State", line)
if state == parseState.TOP_LEVEL:
state = parseState.SUB_LEVEL
level =+ 1
if "{" not in no_quotes_line and "[" not in no_quotes_line:
top_thing = line.strip(" ").split(" ")
elif "{" in no_quotes_line:
if "}" not in no_quotes_line:
brace_depth = brace_depth + 1
top_thing = line.split("{")[0].strip(" ").split(" ") # needs fixing noquotes
else: # must be [
if "]" not in no_quotes_line:
brace_depth = brace_depth + 1
top_thing = line.split("[")[0].strip(" ").split(" ") # needs fixing noquotes
cbs.top_cb(top_thing[:], orig_line)
thing_stack = [top_thing]
pending_output.append(orig_line)
elif state == parseState.SUB_LEVEL:
if (("]" in no_quotes_line and "[" not in no_quotes_line) or
("}" in no_quotes_line and "{" not in no_quotes_line)):
# this won't check nesting properly just numbers
brace_depth = brace_depth - 1
old_thing_stack = thing_stack[:]
if brace_depth == 0:
state = parseState.TOP_LEVEL
level = 0
else:
state = parseState.SUB_LEVEL
level = level - 1
del thing_stack[0]
if level == 0:
res = cbs.sub_exit_cb(old_thing_stack, orig_line)
rc = res[0]
if not isinstance(rc, int):
rc = rc.value
if rc == returnCodes.RC_UNCHANGED.value:
pending_output.append(orig_line)
else:
# these are all possible in one return
if state == parseState.TOP_LEVEL and ((rc & returnCodes.RC_REPLACE_SECTION.value) == returnCodes.RC_REPLACE_SECTION.value):
new_section = res[3]
pending_output = new_section
change = True
if (rc & returnCodes.RC_REPLACE_LINE.value) == returnCodes.RC_REPLACE_LINE.value:
new_l = res[1]
pending_output.append(new_l + "\n")
change = True
else:
pending_output.append(orig_line)
if (rc & returnCodes.RC_NEW_FILE.value) == returnCodes.RC_NEW_FILE.value:
extra_output = res[2]
with open("/".join(fn.split("/")[:-1]) + "/auto_" + fn.split("/")[-1], 'w') as extra:
p ("OUT:", extra_output)
extra.write(extra_output)
if state == parseState.TOP_LEVEL:
f_new_lines = f_new_lines + pending_output
pending_output = []
continue
remove_item = False
if '<<' in no_quotes_line:
skipping_state = parseState.SUB_LEVEL
state = parseState.SKIPPING_HEREDOC
here_marker = line.split('<<')[1]
elif "{" not in no_quotes_line and "[" not in no_quotes_line:
sub_thing = line.strip(" ").split(" ")
remove_item = True
elif "{" in no_quotes_line:
if "}" not in no_quotes_line:
brace_depth = brace_depth + 1
level = level + 1
sub_thing = line.split("{")[0].strip(" ").split(" ")
elif "[" in no_quotes_line:
if "]" not in no_quotes_line:
brace_depth = brace_depth + 1
level = level + 1
sub_thing = line.split("[")[0].strip(" ").split(" ")
thing_stack.insert(0, sub_thing)
new_l = cbs.sub_cb(thing_stack[:], orig_line)
if remove_item: # set by { and [ not in line
del thing_stack[0]
rc = new_l[0]
if not isinstance(rc, int):
rc = rc.value
if rc & returnCodes.RC_REPLACE_LINE.value == returnCodes.RC_REPLACE_LINE.value:
p ("PENDING OUT REPLACE:", new_l[1])
pending_output.append(new_l[1] + "\n")
change = True
else:
p ("PENDING OUT", orig_line.strip('\n'))
pending_output.append(orig_line)
elif state == parseState.SKIPPING_HEREDOC:
if line == here_marker:
state = skipping_state
here_marker = ''
p ("PENDING OUT:", orig_line.strip("\n"))
pending_output.append(orig_line)
elif state == parseState.SKIPPING_MULTILINE_COMMENT:
if line.endswith("*/"):
state = skipping_state
if pending_output:
pending_output.append(orig_line)
else:
f_new_lines.append(orig_line)
else:
raise Exception("Illegal State", line)
if state != parseState.SKIPPING_HEREDOC and line.endswith("/*"):
skipping_state = state
state = parseState.SKIPPING_MULTILINE_COMMENT
if pending_output:
f_new_lines = f_new_lines + pending_output
pending_output = []
f_new.write("".join(f_new_lines))
if not change:
os.unlink(fn + ".new")
else:
os.rename(fn + ".new", fn)
class CallBacks:
file_name = ""
keys_for_terraform = [] # list(str)
def file_cb(self, things):
self.file_name = things
p ("/" + things)
p ("=================")
def top_cb(self, thing_stack, line):
things = thing_stack.pop()
if things[0] != "terraform":
return
self.keys_for_terraform = []
print ("1/" + self.file_name + "/" + "/".join(things))
def sub_exit_cb(self, thing_stack, line):
if len(thing_stack) != 2:
return (returnCodes.RC_UNCHANGED,)
top = thing_stack[1]
second = thing_stack[0]
if top[0] != "terraform":
return (returnCodes.RC_UNCHANGED,)
if second[0] != "backend":
return (returnCodes.RC_UNCHANGED,)
adding = ""
extra_output = ""
print ("Keys are", self.keys_for_terraform)
if "dynamodb_table" not in self.keys_for_terraform:
print ("Adding dynabmodb_table line 2/" + self.file_name + "/" + "/".join(top + second))
adding += """ dynamodb_table = "terraform-lock"\n"""
extra_output = """resource "aws_dynamodb_table" "terraform_state_lock" {
name = "terraform-lock"
read_capacity = 5
write_capacity = 5
hash_key = "LockID"
attribute {
name = "LockID"
type = "S"
}
}
"""
if "acl" not in self.keys_for_terraform:
print ("Adding acl line 2/" + self.file_name + "/" + "/".join(top + second))
adding += """ acl = "bucket-owner-full-control"\n"""
if "encrypt" not in self.keys_for_terraform:
print ("Adding encrypt line 2/" + self.file_name + "/" + "/".join(top + second))
adding += """ encrypt = true\n"""
self.keys_for_terraform = []
if adding:
return (returnCodes.RC_REPLACE_LINE.value + returnCodes.RC_NEW_FILE.value,
adding + line, extra_output)
return (returnCodes.RC_UNCHANGED,)
def sub_cb(self, thing_stack, line):
if len(thing_stack) == 3:
top = thing_stack[2]
second = thing_stack[1]
third = thing_stack[0]
if not top or not top[0] == "terraform":
return (returnCodes.RC_UNCHANGED,)
if len(second) < 2 or second[0] != "backend" or second[1] != '"s3"':
return (returnCodes.RC_UNCHANGED,)
self.keys_for_terraform.append(third[0])
if third[0] == "bucket":
if third[2] != '"terraform-state-us-east-1"':
print ("3/" + self.file_name + "/" + "/".join(top + second + third))
return (returnCodes.RC_REPLACE_LINE, line.replace(third[2],
'"terraform-state-us-east-1"'))
return (returnCodes.RC_UNCHANGED,)
if __name__ == "__main__":
handle_file("../../terraform", CallBacks())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment