Skip to content

Instantly share code, notes, and snippets.

@juanbrny
Created January 9, 2024 21:22
Show Gist options
  • Save juanbrny/5ec4c68bfef677985e4f3caa831b95c9 to your computer and use it in GitHub Desktop.
Save juanbrny/5ec4c68bfef677985e4f3caa831b95c9 to your computer and use it in GitHub Desktop.
Python script to get all members of a given fleet clustergroup
import json
import subprocess
import sys
# fleet clustergroup membership is dynamic based on the rules in the clustergroup definition
# Those rules are either a matchExpressions selector, a matchLabels selector or a combination of both.
# This script gets the clustergroup name as parameter and get's the rule definition in json format.
# Those rules are then formated to be used in kubectl command that finds all the fleet clustergroup
# members that match those rules and show the in the standard out put.
def run_kubectl_command(cluster_group_name):
cmd = ['kubectl', 'get', 'clustergroup.fleet.cattle.io', cluster_group_name, '--namespace', 'fleet-default', '-o', 'json']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print("Error running kubectl command:", result.stderr)
sys.exit(1)
return json.loads(result.stdout)
def extract_match_labels(selector_json):
return ','.join([f'{k}={v}' for k, v in selector_json.get('matchLabels', {}).items()])
def extract_match_expressions(selector_json):
expressions = []
for expr in selector_json.get('matchExpressions', []):
key = expr['key']
operator = expr['operator']
values = expr['values']
if operator == 'In':
expressions.append(f'{key} in ({",".join(values)})')
elif operator == 'NotIn':
expressions.append(f'{key} notin ({",".join(values)})')
elif operator in ['<', '>']:
expressions.append(f'{key} {operator} {values[0]}')
return ','.join(expressions)
def main(cluster_group_name):
selector_json = run_kubectl_command(cluster_group_name)['spec']['selector']
match_labels = extract_match_labels(selector_json)
match_expressions = extract_match_expressions(selector_json)
selector = ','.join(filter(None, [match_labels, match_expressions]))
if selector:
kubectl_cmd = f'kubectl get clusters.fleet.cattle.io --namespace fleet-default -o=custom-columns=MEMBERS:.metadata.name -l "{selector}"'
#print("Running command:", kubectl_cmd)
subprocess.run(kubectl_cmd, shell=True)
else:
print("No valid selector found.")
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python script.py <ClusterGroup-Name>")
sys.exit(1)
main(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment