Skip to content

Instantly share code, notes, and snippets.

@guapodero
Created November 10, 2023 23:08
Show Gist options
  • Save guapodero/132e42565769035084a38f5ee5a4733b to your computer and use it in GitHub Desktop.
Save guapodero/132e42565769035084a38f5ee5a4733b to your computer and use it in GitHub Desktop.
A self-contained CLI for google cloud compute instances.
#!/usr/local/src/tennant_gcp/venv/bin/python
"""
argparse
requests
google-cloud-compute
google-cloud-logging
google-cloud-secret-manager
"""
#;venv=$(head -1 $0 | grep -Po '#!\K.+/venv');
#;sudo python -m venv ${venv};
#;sudo sh -c "${venv}/bin/pip install -r <(cat $0 | grep -Poz '#![^\n]+\n\"\"\"[^\"]+\"\"\"' | grep -av '^[#\"]')";
# tennant_gcp.py
# A self-contained CLI for google cloud compute instances.
# install dependencies:
# eval "$(realpath tennant_gcp.py | xargs sh -c 'grep "^#;" $0 | cut -c 3- | sed s#\$0#$0#')"
# use:
# my_process | tennant_gcp.py log_batch my_process
from queue import Empty as QueueEmpty
import multiprocessing
import signal
import sys
import time
import argparse
import requests
from google.cloud.compute_v1.services import instances
from google.cloud import logging_v2 as logging
from google.cloud.logging_v2._helpers import LogSeverity
from google.cloud.logging_v2.handlers._monitored_resources import detect_resource
from google.cloud import secretmanager_v1 as secretmanager
def instance_metadata(name):
metadata_server = "http://metadata/computeMetadata/v1/instance/"
metadata_flavor = {"Metadata-Flavor": "Google"}
return requests.get(metadata_server + name, headers = metadata_flavor).text
# function docstrings contain permissions which are assumed to be granted to the instance service account
# https://cloud.google.com/compute/docs/access/service-accounts#newserviceaccounts
# https://cloud.google.com/iam/docs/permissions-reference#search
# https://cloud.google.com/iam/docs/creating-custom-roles#creating
args = argparse.ArgumentParser(description="custom google.cloud cli")
action = args.add_subparsers(dest="action")
log = action.add_parser("log", help="log a message to cloud console")
log.add_argument("service")
log.add_argument("severity")
log.add_argument("message", nargs="+")
def log(service, severity, message):
"""
logging.logEntries.create
"""
# https://cloud.google.com/python/docs/reference/logging/latest/logger#logmessagenone-kw
# https://cloud.google.com/python/docs/reference/logging/latest/google.cloud.logging_v2.entries.LogEntry
client = logging.Client()
logger = client.logger(name="instance_services")
message_str = " ".join(message)
logger.log(message_str, severity=severity, labels=dict(service=service))
log_batch = action.add_parser("log_batch", help="log messages from STDIN to cloud console in batch")
log_batch.add_argument("service")
def log_batch(service):
"""
logging.logEntries.create
"""
# start a child process which periodically wakes to send logs
ctx = multiprocessing.get_context("spawn")
ready = ctx.Event()
stop = ctx.Event()
send_queue = ctx.Queue()
sender = ctx.Process(target=batch_sender, args=(ready, stop, send_queue), daemon=True)
sender.start()
ready.wait()
def shutdown(*args):
stop.set()
sender.join()
sys.exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
severity_names = [e for e in dir(LogSeverity) if not e.startswith("_")]
line_format = "{} message".format("|".join(severity_names))
format_shown = False
for line in sys.stdin:
[severity, message] = line.strip().split(" ", 1)
if severity not in severity_names:
if not format_shown:
sys.stderr.write(f"invalid input line\n{line}expected {line_format}\n")
format_shown = True
continue
send_queue.put([service, severity, message.strip("'\"")])
shutdown()
def batch_sender(ready, stop, send_queue):
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write#body.request_body.FIELDS.entries
# https://github.com/googleapis/python-logging
client = logging.Client()
logger = client.logger(name="instance_services")
# otherwise defaults to global
# https://cloud.google.com/python/docs/reference/logging/latest/logger#google.cloud.logging_v2.logger.Logger
resource = detect_resource()
ready.set()
while True:
if send_queue.empty():
if stop.is_set():
break
try:
time.sleep(1)
continue
except KeyboardInterrupt:
pass # process terminated by parent
with logger.batch() as batch:
while not send_queue.empty():
try:
[service, severity, message] = send_queue.get_nowait()
except QueueEmpty:
break
batch.log(message, severity=severity, labels=dict(service=service), resource=resource)
secret = action.add_parser("secret", help="get a secret from secrets manager")
secret.add_argument("name")
def secret(name):
"""
secretmanager.versions.access
"""
project_id = instance_metadata("zone").split("/")[1]
full_name = f"projects/{project_id}/secrets/{name}/versions/latest"
client = secretmanager.SecretManagerServiceClient()
request = secretmanager.AccessSecretVersionRequest(name=full_name)
response = client.access_secret_version(request=request)
print(response.payload.data.decode("UTF-8"))
if __name__ == "__main__":
pa = args.parse_args()
match pa.action:
case "log": log(pa.service, pa.severity, pa.message)
case "log_batch": log_batch(pa.service)
case "secret": secret(pa.name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment