Skip to content

Instantly share code, notes, and snippets.

@AfvanMoopen
Last active December 16, 2022 08:14
Show Gist options
  • Save AfvanMoopen/c7e82cc8034c530296bd5df28f9382be to your computer and use it in GitHub Desktop.
Save AfvanMoopen/c7e82cc8034c530296bd5df28f9382be to your computer and use it in GitHub Desktop.
from google.oauth2 import service_account
from google.cloud import resourcemanager_v3
import collections
import logging
import time
import re
from google.cloud import recommender
logger = logging.getLogger(__name__)
logging.basicConfig(level = logging.INFO)
logger.setLevel(logging.INFO)
SCOPE = ["https://www.googleapis.com/auth/cloud-platform"]
credentials = service_account.Credentials.from_service_account_file("secret.json", scopes = SCOPE)
resource_mgr = resourcemanager_v3.ProjectsClient(credentials=credentials)
rec_client = recommender.RecommenderClient(credentials=credentials)
final_recommendations = {"recommendations" : []}
def get_projects():
'''Get the list of projects using resource manager - List projects that service accounts have access to.'''
projects = []
for project in resource_mgr.search_projects():
projects.append(project.project_id)
return projects
def recom_data(project_id):
'''Get the active recommendation using the recommender client'''
recomms = []
try:
data = rec_client.list_recommendations(parent = f"projects/{project_id}/locations/global/recommenders/google.iam.policy.Recommender")
return data
except Exception as e:
return logger.info(e)
def verify_srv_acc(email , project_id):
'''verify if the service account is User managed one here '''
success = re.search(('@' + project_id), email)
return success
def extract_details(data , project_id):
'''Extracting the required details from recommender response for its fine usage over updating role bindings'''
def_acc = set()
update_data = []
for recommendation in data:
for op_groups in recommendation.content.operation_groups:
for op in op_groups.operations:
try:
srv_email = op.path_filters["/iamPolicy/bindings/*/members/*"]
except:
srv_email = op.value
if not verify_srv_acc(srv_email ,project_id):
def_acc.add(srv_email)
action = op.action
role = op.path_filters["/iamPolicy/bindings/*/role"]
final_recommendations["recommendations"].append({"action" : action , "role" : role , "service_account" : srv_email})
update_data.append((recommendation.name , recommendation.etag))
return final_recommendations , update_data
# print("The Default service accounts are %s" % ",".join(def_acc))
if __name__ == "__main__":
proj_ids = get_projects()
for id in proj_ids:
rec_data = recom_data(id)
# logger.info(f"recommendations for the project {id} is {rec_data}")
rec , _ = extract_details(rec_data , id)
recomm = rec.get("recommendations")
for r in recomm:
srv_acc = r.get("service_account")
role = r.get("role")
action = r.get("action")
print(f"Service account - {srv_acc} - {action} the role {role}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment