Skip to content

Instantly share code, notes, and snippets.

@AfvanMoopen
Last active December 16, 2022 11:46
Show Gist options
  • Save AfvanMoopen/e50b87fa39f44954c79e35bafb67fdf9 to your computer and use it in GitHub Desktop.
Save AfvanMoopen/e50b87fa39f44954c79e35bafb67fdf9 to your computer and use it in GitHub Desktop.
import getopt
import google.cloud.logging
import sys
from google.protobuf.json_format import MessageToDict
from google.cloud import resourcemanager_v3
from google.oauth2 import service_account
SCOPE = ["https://www.googleapis.com/auth/cloud-platform"]
credentials = service_account.Credentials.from_service_account_file("secret.json", scopes = SCOPE)
resource_mgr = resourcemanager_v3.ProjectsClient(credentials=credentials)
# get service_account
options , remainder = getopt.getopt(sys.argv[1:] , "l:")
for opt, arg in options:
if opt in ["-l"]:
srv_acc = arg
assert len(srv_acc) > 0 , "specify Service account"
print(f"The service account specified is {srv_acc}")
def get_projects():
'''Get the list of projects using resource manager'''
projects = []
for project in resource_mgr.search_projects():
projects.append(project.project_id)
return projects
def match_emails(response):
try:
email = response.payload.get("authenticationInfo").get("principalEmail")
except AttributeError as e:
return
return str(email) == srv_acc
def main():
emailList, responses = [], []
projects = get_projects()
client = google.cloud.logging.Client(credentials=credentials)
for response in client.list_entries():
responses.append(response)
res = filter(match_emails , responses)
res = list(res)
print(res)
# for r in res:
# try:
# log_name = r.log_name
# severity = r.severity
# timestamp = r.timestamp
# resource_info = (r.resource.get("type") ,r.resource.get("labels"))
# operation = r.operation
# type_of_log = r.payload.get("@type")
# principalDetails= (r.payload.get("authenticationInfo").get("principalEmail" , r.payload.get("authenticationInfo").get("serviceAccountKeyName") , r.payload.get("authenticationInfo").get("principalSubject"))
# status = r.payload.get("status")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment