Created
May 3, 2024 20:25
-
-
Save bukzor/022906db533fb4ea374f498221b72a11 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from __future__ import annotations | |
import re | |
from typing import TYPE_CHECKING | |
from typing import TypeVar | |
from lib.json import JSONArray | |
from lib.json import JSONObject | |
from lib.json import deepget as json_deepget | |
from lib.json import dumps as json_dumps | |
from lib.json import loads as json_loads | |
from lib.naming import Name | |
from lib.unset import UNSET | |
from lib.unset import Unset | |
if TYPE_CHECKING: | |
from lib import json | |
T = TypeVar("T") | |
name_re = re.compile("(^|[A-Z]+)[a-z0-9]*") | |
del re # assertion: that's is the only regex | |
def get_log_name(logline: json.Object) -> str: | |
return ( | |
gcp_json_get(logline, str, "logName") | |
.rsplit("/", 1)[-1] | |
.replace(".googleapis.com%2F", "/", 1) | |
.replace("%2F", "/") | |
) | |
def name_to_snakecase(name: str) -> str: | |
parts = name.split("_") | |
return "".join([parts[0].lower(), *(part.title() for part in parts[1:])]) | |
def gcp_json_get( | |
logline: json.Object, | |
result_type: type[T], | |
key: str, | |
*, | |
default: Unset | T = UNSET, | |
) -> T: | |
"""Compensate for GCP's silly decision to sometimes convert json keys to camelCase and sometimes leave them snake_case.""" | |
name = Name(key) | |
first_error: Exception | None = None | |
for key in set( | |
str(name) for name in (name, name.camelCase(), name.snake_case()) | |
): | |
try: | |
from lib.json import get as json_get | |
return json_get(logline, result_type, str(key)) | |
except Exception as error: | |
if first_error is None: | |
first_error = error | |
else: | |
if default is UNSET: | |
raise first_error or AssertionError | |
else: | |
return default | |
def get_resource_id(logline: json.Object) -> str: | |
labels = json_deepget(logline, JSONObject, "resource", "labels") | |
for label in ( | |
"revision_name", | |
"build_id", | |
"function_name", | |
"service_name", | |
"project_id", | |
): | |
if label not in labels: | |
continue | |
result = labels[label] | |
assert isinstance(result, str), result | |
if label == "build_id": | |
result = result[-9:] | |
if result: | |
return result | |
raise AssertionError("found no resource id", json_dumps(labels)) | |
def get_messages(logline: json.Object) -> list[str]: | |
messages: list[str] = [] | |
messages.append(gcp_json_get(logline, str, "textPayload", default="")) | |
payload = gcp_json_get(logline, JSONObject, "jsonPayload", default=None) | |
if payload: | |
for key in ("message", "MESSAGE", "msg"): | |
messages.append(gcp_json_get(payload, str, key, default="")) | |
return [message for message in messages if message] | |
def get_rpc_authz(proto: json.Object) -> json.Object: | |
authz_info: json.Array = gcp_json_get( | |
proto, JSONArray, "authorizationInfo", default=[] | |
) | |
match len(authz_info): | |
case 0: | |
return {} | |
case 1: | |
result = authz_info[0] | |
assert isinstance(result, dict) | |
return result | |
case _: | |
raise AssertionError("multiple authz?") | |
def get_rpc_method_names(proto: json.Object) -> str: | |
authz = get_rpc_authz(proto) | |
try: | |
return gcp_json_get(authz, str, "permission") | |
except KeyError: | |
pass | |
return ".".join( | |
( | |
gcp_json_get(proto, str, "serviceName").replace( | |
".googleapis.com", "" | |
), | |
gcp_json_get(proto, str, "methodName").split(".")[-1], | |
) | |
) | |
def get_rpc_response(proto: json.Object) -> str: | |
statuses: list[json.Object] = [] | |
for deepkey in (("status",), ("response", "status")): | |
try: | |
status = json_deepget(proto, JSONObject, *deepkey) | |
except KeyError: | |
continue | |
try: | |
return gcp_json_get(status, str, "message") | |
except KeyError: | |
statuses.append(status) | |
authz = get_rpc_authz(proto) | |
if (key := "granted") in authz: | |
return "allow" if authz[key] else "deny" | |
for status in statuses: | |
if status: | |
json_dumps(status) | |
else: | |
return "(success)" | |
raise AssertionError("found no log message") | |
def get_rpc_object(proto: json.Object) -> str: | |
try: | |
resource_name = gcp_json_get(proto, str, "resourceName") | |
except KeyError: | |
authz = get_rpc_authz(proto) | |
resource_name = gcp_json_get(authz, str, "resource") | |
parts = resource_name.split("/") | |
if len(parts) % 2: # odd | |
return parts[-1] | |
elif len(parts) == 2 and parts[0] == "projects": # odd | |
return parts[-1] | |
elif len(parts) >= 2: | |
return "/".join(parts[-2:]) | |
else: | |
raise AssertionError(parts) | |
def get_rpc_principal_chain(proto: json.Object) -> list[str]: | |
try: | |
authn: json.Object = gcp_json_get( | |
proto, JSONObject, "authenticationInfo" | |
) | |
except KeyError: | |
return [] | |
result = [gcp_json_get(authn, str, "principalEmail")] | |
if (key := "serviceAccountDelegationInfo") in authn: | |
for delegation_info in gcp_json_get(authn, JSONArray, key): | |
result.append( | |
json_deepget( | |
delegation_info, | |
str, | |
"firstPartyPrincipal", | |
"principalEmail", | |
) | |
) | |
return result | |
def summarize_principal(principal: str, project: str) -> str: | |
for suffix, replacement in ( | |
(".iam.gserviceaccount.com", ""), | |
(f"@{project}", "@"), | |
): | |
if principal.endswith(suffix): | |
principal = principal.rpartition(suffix)[0] + replacement | |
return principal | |
def get_project(logline: json.Object) -> str: | |
log_name = gcp_json_get(logline, str, "logName") | |
parts = log_name.split("/") | |
assert parts[0] == "projects", parts | |
return parts[1] | |
def summarize_principal_chain(proto: json.Object, project: str) -> list[str]: | |
principal_chain = get_rpc_principal_chain(proto) | |
if not principal_chain: | |
return [] | |
principal_summary = [" "] | |
principal_summary.append( | |
summarize_principal(principal_chain.pop(0), project) | |
) | |
for principal in principal_chain: | |
principal_summary.append("<-") | |
principal_summary.append(summarize_principal(principal, project)) | |
return principal_summary | |
def summarize_proto(proto: json.Object, project: str) -> list[str]: | |
return [ | |
"RPC: ", | |
*get_rpc_method_names(proto), | |
": ", | |
get_rpc_response(proto), | |
" (", | |
get_rpc_object(proto), | |
*summarize_principal_chain(proto, project), | |
")", | |
] | |
def summarize_http_request(http_request: json.Object) -> list[str]: | |
try: | |
status = str(gcp_json_get(http_request, int, "status")) | |
except KeyError: | |
status = "RST" | |
return [ | |
"HTTP: ", | |
status, | |
" ", | |
gcp_json_get(http_request, str, "requestMethod"), | |
" ", | |
gcp_json_get(http_request, str, "requestUrl"), | |
" ", | |
gcp_json_get(http_request, str, "remoteIp"), | |
] | |
def json_to_message(logline: json.Object) -> str: | |
result: list[str] = [ | |
gcp_json_get(logline, str, "timestamp"), | |
" [", | |
get_log_name(logline), | |
":", | |
get_resource_id(logline), | |
"] ", | |
] | |
labels = gcp_json_get(logline, JSONObject, "labels", default={}) | |
project = get_project(logline) | |
if messages := get_messages(logline): | |
result.extend(messages) | |
elif payload := gcp_json_get( | |
logline, JSONObject, "protoPayload", default=None | |
): | |
result.extend(summarize_proto(payload, project)) | |
elif payload := gcp_json_get( | |
logline, JSONObject, "httpRequest", default=None | |
): | |
result.extend(summarize_http_request(payload)) | |
elif (key := "authorization.k8s.io/reason") in labels: | |
result.append(gcp_json_get(labels, str, key)) | |
else: | |
raise AssertionError("Unable to summarize this log entry!") | |
return "".join(result) | |
def main(): | |
from sys import stdin | |
for orig in stdin: | |
logline = json_loads(orig) | |
assert isinstance(logline, dict) | |
try: | |
print(json_dumps(json_to_message(logline)), flush=True) | |
except Exception as e: | |
print(json_dumps(f"{type(e).__name__}: {e}")) | |
print(orig, flush=True) | |
if __name__ == "__main__": | |
raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment