Created
January 9, 2024 21:22
-
-
Save juanbrny/5ec4c68bfef677985e4f3caa831b95c9 to your computer and use it in GitHub Desktop.
Python script to get all members of a given fleet clustergroup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import subprocess | |
import sys | |
# fleet clustergroup membership is dynamic based on the rules in the clustergroup definition | |
# Those rules are either a matchExpressions selector, a matchLabels selector or a combination of both. | |
# This script gets the clustergroup name as parameter and get's the rule definition in json format. | |
# Those rules are then formated to be used in kubectl command that finds all the fleet clustergroup | |
# members that match those rules and show the in the standard out put. | |
def run_kubectl_command(cluster_group_name): | |
cmd = ['kubectl', 'get', 'clustergroup.fleet.cattle.io', cluster_group_name, '--namespace', 'fleet-default', '-o', 'json'] | |
result = subprocess.run(cmd, capture_output=True, text=True) | |
if result.returncode != 0: | |
print("Error running kubectl command:", result.stderr) | |
sys.exit(1) | |
return json.loads(result.stdout) | |
def extract_match_labels(selector_json): | |
return ','.join([f'{k}={v}' for k, v in selector_json.get('matchLabels', {}).items()]) | |
def extract_match_expressions(selector_json): | |
expressions = [] | |
for expr in selector_json.get('matchExpressions', []): | |
key = expr['key'] | |
operator = expr['operator'] | |
values = expr['values'] | |
if operator == 'In': | |
expressions.append(f'{key} in ({",".join(values)})') | |
elif operator == 'NotIn': | |
expressions.append(f'{key} notin ({",".join(values)})') | |
elif operator in ['<', '>']: | |
expressions.append(f'{key} {operator} {values[0]}') | |
return ','.join(expressions) | |
def main(cluster_group_name): | |
selector_json = run_kubectl_command(cluster_group_name)['spec']['selector'] | |
match_labels = extract_match_labels(selector_json) | |
match_expressions = extract_match_expressions(selector_json) | |
selector = ','.join(filter(None, [match_labels, match_expressions])) | |
if selector: | |
kubectl_cmd = f'kubectl get clusters.fleet.cattle.io --namespace fleet-default -o=custom-columns=MEMBERS:.metadata.name -l "{selector}"' | |
#print("Running command:", kubectl_cmd) | |
subprocess.run(kubectl_cmd, shell=True) | |
else: | |
print("No valid selector found.") | |
if __name__ == "__main__": | |
if len(sys.argv) != 2: | |
print("Usage: python script.py <ClusterGroup-Name>") | |
sys.exit(1) | |
main(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment