Created
September 20, 2021 18:01
-
-
Save kmcquade/0e26d3841d1c063cd7ecc9df0f132128 to your computer and use it in GitHub Desktop.
Run Terraform get recursively using python-terraform
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python_terraform==0.10.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import os | |
import logging | |
import argparse | |
import sys | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from python_terraform import Terraform | |
import fnmatch | |
import shutil | |
logger = logging.getLogger(__name__) | |
END = "\033[0m" | |
GREY = "\33[90m" | |
def terraform_get(tf_directory: str, thread_count: int = 3, clean_existing: bool = False, update_terraform_modules: bool = False): | |
repo_directory = os.path.abspath(tf_directory) | |
if not os.path.exists(repo_directory): | |
raise Exception(f"Please provide a valid directory. Directory provided: {repo_directory}") | |
if clean_existing: | |
print("Removing any existing .terraform directories...") | |
# Remove any existing .terraform directories | |
purge(directory=repo_directory, pattern="*.terraform") | |
# Create a list of directory types to skip | |
unfiltered_subfolders = [x[0] for x in os.walk(repo_directory)] | |
exclude_directory_patterns = [ | |
".git", ".idea", ".DSStore", ".venv", ".serverless", "node_modules", ".terraform" | |
] | |
all_subfolders = [] | |
for sub in unfiltered_subfolders: | |
for pattern in exclude_directory_patterns: | |
if pattern not in sub: | |
all_subfolders.append(sub) | |
# Find subfolders that have Terraform files in it (not including .terraform folders) | |
subfolders_with_terraform = set() | |
for subfolder in all_subfolders: | |
file_list = get_filenames_in_folder(subfolder) | |
if does_file_list_contain_terraform_files(file_list): | |
subfolders_with_terraform.add(subfolder) | |
subfolders_with_terraform = list(subfolders_with_terraform) | |
if update_terraform_modules: | |
threads = [] | |
# First, let's run this in the target directory | |
# print("Running 'terraform get' on subdirectories...") | |
print(f"Running 'terraform get' on the root directory: {os.path.relpath(repo_directory)}") | |
run_tf_get_command(repo_directory, update=True) | |
# Sometimes there are not Terraform repos inside the main directory, but they are in nested directories. | |
# Let's run Terraform get for those cases as well. | |
print(f"Running 'terraform get' on all Terraform modules nested under the folder: {os.path.relpath(repo_directory)}") | |
with ThreadPoolExecutor(max_workers=thread_count) as executor: | |
for index, subfolder in enumerate(subfolders_with_terraform): | |
threads.append(executor.submit(run_tf_get_command, tf_directory=subfolder, update=True)) | |
stuff = as_completed(threads) | |
for index, task in enumerate(stuff): | |
response, return_code, tf_dir = task.result() | |
if return_code > 0: | |
response_string = f"\texit {return_code}: {response}" | |
else: | |
response_string = "\texit 0" | |
print_grey_replace_output(f"\t[COMPLETED] terraform get ({index + 1}/{len(subfolders_with_terraform)}): {os.path.relpath(tf_dir)}{response_string}") | |
def run_tf_get_command(tf_directory: str, update: bool = False): | |
tf = Terraform(working_dir=tf_directory) | |
os.environ["TF_IN_AUTOMATION"] = "1" | |
if update: | |
update_str = "-update" | |
else: | |
update_str = None | |
os.environ["TF_DATA_DIR"] = ".terraform" # Workaround to https://github.com/bridgecrewio/checkov/issues/1312 | |
return_code, stdout, stderr = tf.cmd("get", "-no-color", update_str) | |
return stdout, return_code, tf_directory | |
def purge(directory: str, pattern: str, exclude: list = None) -> None: | |
""" | |
Usage: | |
purge(".", "*.tfstate") | |
purge(".", "*.tfstate.backup") | |
purge(".", "*.terraform") | |
purge(".", FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS) | |
:return: | |
""" | |
if not exclude: | |
exclude = [] | |
for root, dirnames, filenames in os.walk(directory): | |
dirnames[:] = [d for d in dirnames if d not in exclude] | |
for filename in fnmatch.filter(filenames, pattern): | |
f = os.path.join(root, filename) | |
os.remove(f) | |
for dirname in fnmatch.filter(dirnames, pattern): | |
d = os.path.join(root, dirname) | |
shutil.rmtree(d) | |
def get_filenames_in_folder(path: str) -> list: | |
"""Given a folder path, get a list of filenames in that folder.""" | |
file_list = [] | |
try: | |
for filename in os.listdir(path): | |
if os.path.isfile(os.path.join(path, filename)): | |
if filename not in file_list: | |
file_list.append(filename) | |
file_list.sort() | |
return file_list | |
except FileNotFoundError: | |
return [] | |
def does_file_list_contain_terraform_files(file_list: list) -> bool: | |
result = False | |
for file in file_list: | |
if file.endswith(".tf"): | |
result = True | |
break | |
return result | |
def print_grey_replace_output(string): | |
sys.stdout.write(f"\r{GREY}{string}{END}") | |
sys.stdout.flush() | |
def print_grey(string): | |
print(f"{GREY}{string}{END}") | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-d", "--directory", help="Base directory containing the Terraform files") | |
parser.add_argument("-t", "--threads", type=int, default=3, help="Thread count") | |
args = parser.parse_args() | |
this_tf_dir = os.path.abspath(args.directory) | |
terraform_get(tf_directory=this_tf_dir, thread_count=args.threads, update_terraform_modules=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment