Skip to content

Instantly share code, notes, and snippets.

@bukzor
Created May 3, 2024 20:25
Show Gist options
  • Save bukzor/022906db533fb4ea374f498221b72a11 to your computer and use it in GitHub Desktop.
Save bukzor/022906db533fb4ea374f498221b72a11 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from __future__ import annotations
import re
from typing import TYPE_CHECKING
from typing import TypeVar
from lib.json import JSONArray
from lib.json import JSONObject
from lib.json import deepget as json_deepget
from lib.json import dumps as json_dumps
from lib.json import loads as json_loads
from lib.naming import Name
from lib.unset import UNSET
from lib.unset import Unset
if TYPE_CHECKING:
from lib import json
T = TypeVar("T")
name_re = re.compile("(^|[A-Z]+)[a-z0-9]*")
del re # assertion: that's is the only regex
def get_log_name(logline: json.Object) -> str:
return (
gcp_json_get(logline, str, "logName")
.rsplit("/", 1)[-1]
.replace(".googleapis.com%2F", "/", 1)
.replace("%2F", "/")
)
def name_to_snakecase(name: str) -> str:
parts = name.split("_")
return "".join([parts[0].lower(), *(part.title() for part in parts[1:])])
def gcp_json_get(
logline: json.Object,
result_type: type[T],
key: str,
*,
default: Unset | T = UNSET,
) -> T:
"""Compensate for GCP's silly decision to sometimes convert json keys to camelCase and sometimes leave them snake_case."""
name = Name(key)
first_error: Exception | None = None
for key in set(
str(name) for name in (name, name.camelCase(), name.snake_case())
):
try:
from lib.json import get as json_get
return json_get(logline, result_type, str(key))
except Exception as error:
if first_error is None:
first_error = error
else:
if default is UNSET:
raise first_error or AssertionError
else:
return default
def get_resource_id(logline: json.Object) -> str:
labels = json_deepget(logline, JSONObject, "resource", "labels")
for label in (
"revision_name",
"build_id",
"function_name",
"service_name",
"project_id",
):
if label not in labels:
continue
result = labels[label]
assert isinstance(result, str), result
if label == "build_id":
result = result[-9:]
if result:
return result
raise AssertionError("found no resource id", json_dumps(labels))
def get_messages(logline: json.Object) -> list[str]:
messages: list[str] = []
messages.append(gcp_json_get(logline, str, "textPayload", default=""))
payload = gcp_json_get(logline, JSONObject, "jsonPayload", default=None)
if payload:
for key in ("message", "MESSAGE", "msg"):
messages.append(gcp_json_get(payload, str, key, default=""))
return [message for message in messages if message]
def get_rpc_authz(proto: json.Object) -> json.Object:
authz_info: json.Array = gcp_json_get(
proto, JSONArray, "authorizationInfo", default=[]
)
match len(authz_info):
case 0:
return {}
case 1:
result = authz_info[0]
assert isinstance(result, dict)
return result
case _:
raise AssertionError("multiple authz?")
def get_rpc_method_names(proto: json.Object) -> str:
authz = get_rpc_authz(proto)
try:
return gcp_json_get(authz, str, "permission")
except KeyError:
pass
return ".".join(
(
gcp_json_get(proto, str, "serviceName").replace(
".googleapis.com", ""
),
gcp_json_get(proto, str, "methodName").split(".")[-1],
)
)
def get_rpc_response(proto: json.Object) -> str:
statuses: list[json.Object] = []
for deepkey in (("status",), ("response", "status")):
try:
status = json_deepget(proto, JSONObject, *deepkey)
except KeyError:
continue
try:
return gcp_json_get(status, str, "message")
except KeyError:
statuses.append(status)
authz = get_rpc_authz(proto)
if (key := "granted") in authz:
return "allow" if authz[key] else "deny"
for status in statuses:
if status:
json_dumps(status)
else:
return "(success)"
raise AssertionError("found no log message")
def get_rpc_object(proto: json.Object) -> str:
try:
resource_name = gcp_json_get(proto, str, "resourceName")
except KeyError:
authz = get_rpc_authz(proto)
resource_name = gcp_json_get(authz, str, "resource")
parts = resource_name.split("/")
if len(parts) % 2: # odd
return parts[-1]
elif len(parts) == 2 and parts[0] == "projects": # odd
return parts[-1]
elif len(parts) >= 2:
return "/".join(parts[-2:])
else:
raise AssertionError(parts)
def get_rpc_principal_chain(proto: json.Object) -> list[str]:
try:
authn: json.Object = gcp_json_get(
proto, JSONObject, "authenticationInfo"
)
except KeyError:
return []
result = [gcp_json_get(authn, str, "principalEmail")]
if (key := "serviceAccountDelegationInfo") in authn:
for delegation_info in gcp_json_get(authn, JSONArray, key):
result.append(
json_deepget(
delegation_info,
str,
"firstPartyPrincipal",
"principalEmail",
)
)
return result
def summarize_principal(principal: str, project: str) -> str:
for suffix, replacement in (
(".iam.gserviceaccount.com", ""),
(f"@{project}", "@"),
):
if principal.endswith(suffix):
principal = principal.rpartition(suffix)[0] + replacement
return principal
def get_project(logline: json.Object) -> str:
log_name = gcp_json_get(logline, str, "logName")
parts = log_name.split("/")
assert parts[0] == "projects", parts
return parts[1]
def summarize_principal_chain(proto: json.Object, project: str) -> list[str]:
principal_chain = get_rpc_principal_chain(proto)
if not principal_chain:
return []
principal_summary = [" "]
principal_summary.append(
summarize_principal(principal_chain.pop(0), project)
)
for principal in principal_chain:
principal_summary.append("<-")
principal_summary.append(summarize_principal(principal, project))
return principal_summary
def summarize_proto(proto: json.Object, project: str) -> list[str]:
return [
"RPC: ",
*get_rpc_method_names(proto),
": ",
get_rpc_response(proto),
" (",
get_rpc_object(proto),
*summarize_principal_chain(proto, project),
")",
]
def summarize_http_request(http_request: json.Object) -> list[str]:
try:
status = str(gcp_json_get(http_request, int, "status"))
except KeyError:
status = "RST"
return [
"HTTP: ",
status,
" ",
gcp_json_get(http_request, str, "requestMethod"),
" ",
gcp_json_get(http_request, str, "requestUrl"),
" ",
gcp_json_get(http_request, str, "remoteIp"),
]
def json_to_message(logline: json.Object) -> str:
result: list[str] = [
gcp_json_get(logline, str, "timestamp"),
" [",
get_log_name(logline),
":",
get_resource_id(logline),
"] ",
]
labels = gcp_json_get(logline, JSONObject, "labels", default={})
project = get_project(logline)
if messages := get_messages(logline):
result.extend(messages)
elif payload := gcp_json_get(
logline, JSONObject, "protoPayload", default=None
):
result.extend(summarize_proto(payload, project))
elif payload := gcp_json_get(
logline, JSONObject, "httpRequest", default=None
):
result.extend(summarize_http_request(payload))
elif (key := "authorization.k8s.io/reason") in labels:
result.append(gcp_json_get(labels, str, key))
else:
raise AssertionError("Unable to summarize this log entry!")
return "".join(result)
def main():
from sys import stdin
for orig in stdin:
logline = json_loads(orig)
assert isinstance(logline, dict)
try:
print(json_dumps(json_to_message(logline)), flush=True)
except Exception as e:
print(json_dumps(f"{type(e).__name__}: {e}"))
print(orig, flush=True)
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment