Skip to content

Instantly share code, notes, and snippets.

@ImIOImI
Last active December 1, 2023 21:39
Show Gist options
  • Save ImIOImI/328aadacde5d939a61383ecadc1d2a08 to your computer and use it in GitHub Desktop.
Save ImIOImI/328aadacde5d939a61383ecadc1d2a08 to your computer and use it in GitHub Desktop.
Search All K8s Manifests for A String
#!/usr/bin/env python3
import subprocess
import argparse
import os
def get_resource_types():
cmd = ["kubectl", "api-resources", "-o", "name"]
output = subprocess.check_output(cmd, text=True)
resource_types = output.strip().split('\n')
return resource_types
def search_and_save_manifests(resource_types, search_string):
results_folder = "results"
os.makedirs(results_folder, exist_ok=True)
# Get the total number of resource types
total_resource_types = len(resource_types)
for type_count, resource in enumerate(resource_types):
try:
print(f"Processing resource type {resource}: {type_count + 1} of {total_resource_types}")
cmd = ["kubectl", "get", resource, "--all-namespaces", "-o", "yaml"]
# Get a single yaml file with all the manifests for the resource type. This saves a LOT of kubectl calls
# and time. Its MUCH faster to narrow down the resource type and then getting and searching individual
# manifest files one by one
# all_type_manifests = subprocess.check_output(cmd, text=True).strip().split("\n---\n")
try:
all_type_manifests = subprocess.check_output(cmd, text=True).strip().split("\n---\n")
except subprocess.CalledProcessError as e:
print(f"Error: {e}")
all_type_manifests = ""
for idx, type_manifest in enumerate(all_type_manifests):
# Check if the search string is in the manifest
if search_string in type_manifest:
# we found the string, so break down the huge manifest file into individual resources
cmd = ["kubectl", "get", resource, "-o", "custom-columns=:metadata.name", "--no-headers"]
names = subprocess.check_output(cmd, text=True).strip().split('\n')
for name in names:
cmd = ["kubectl", "get", resource, name, "-o", "yaml"]
manifest = subprocess.check_output(cmd, text=True).strip()
print(f"Manifest for {resource} {name} yaml len: {len(manifest)}")
# Check if the search string is in the manifest
if search_string in manifest:
# we found the string, so save the manifest
file_name = f"{name}.yaml"
results_folder = f"results/{search_string}/{resource}"
os.makedirs(results_folder, exist_ok=True)
with open(os.path.join(results_folder, file_name), "w") as file:
print(f"########### Writing manifest for resource {resource} {name} to file")
file.write(f"# {resource} {name} Manifest {idx}\n")
file.write(manifest)
except subprocess.CalledProcessError as e:
print(f"Error getting manifests for resource {resource}: {e}")
continue
def main():
parser = argparse.ArgumentParser(description="Search and save Kubernetes manifests containing a specific string.")
parser.add_argument("search_string", help="String to search for in manifests.")
args = parser.parse_args()
resource_types = get_resource_types()
search_and_save_manifests(resource_types, args.search_string)
if __name__ == "__main__":
main()
@ImIOImI
Copy link
Author

ImIOImI commented Dec 1, 2023

This is a weapon of last resort. May God have mercy on your soul if it comes to this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment