Skip to content

Instantly share code, notes, and snippets.

@technillogue
Last active September 13, 2022 05:13
Show Gist options
  • Save technillogue/72eb772b9b02cad64702e595d6880786 to your computer and use it in GitHub Desktop.
Save technillogue/72eb772b9b02cad64702e595d6880786 to your computer and use it in GitHub Desktop.
structured log shipping without sidecars
import asyncio
import json
import logging
import os
import sys
# adapted from https://stackoverflow.com/questions/50144628/python-logging-into-file-as-a-dictionary-or-json
class JsonFormatter(logging.Formatter):
"""
Formatter that outputs JSON strings after parsing the LogRecord.
"""
def format(self, record: logging.LogRecord) -> str:
record.message = record.getMessage()
# this is all the normal logrecord attributes except "levelname", "module", "funcName", "lineno"
# we're doing this to recover extras
# https://github.com/python/cpython/blob/main/Lib/logging/__init__.py#L1653
exclude = [
"name",
"msg",
"args",
"levelno",
"pathname",
"filename",
"exc_info",
"exc_text",
"stack_info",
"created",
"msecs",
"relativeCreated",
"thread",
"threadName",
"processName",
"process",
]
message_dict = {k: v for k, v in record.__dict__.items() if k not in exclude}
if record.exc_info:
# Cache the traceback text to avoid converting it multiple times
# (it's constant anyway)
if not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
if record.exc_text:
message_dict["exc_info"] = record.exc_text
if record.stack_info:
message_dict["stack_info"] = self.formatStack(record.stack_info)
return json.dumps(message_dict, default=str)
class Vector:
print("test")
original_stdout = os.dup(sys.stdout.fileno())
original_stderr = os.dup(sys.stderr.fileno())
# explicit pipe we'll use for tee later
tee_read, tee_write = os.pipe()
# explicit pipe for vector -- we'll be passing this to tee
vector_read, vector_write = os.pipe()
# adapted from https://stackoverflow.com/a/651718
# Cause tee's stdin to get a copy of our stdin/stdout (as well as that
# of any child processes we spawn)
# the pipe will buffer everything we write until vector starts reading
os.dup2(tee_write, sys.stdout.fileno())
os.dup2(tee_write, sys.stderr.fileno())
# set up logging
logger = logging.getLogger()
logger.setLevel("DEBUG")
# write structured logs to only vector, not original stderr
vector_file = os.fdopen(vector_write, mode="w")
vector_handler = logging.StreamHandler(vector_file)
vector_handler.setLevel("DEBUG")
vector_handler.setFormatter(JsonFormatter())
logger.addHandler(vector_handler)
# we want to write formatted logs only to the original stderr, not vector
# normally this would be sys.stder
# but we need to open the duplicated fd as a file
stderr_file = os.fdopen(original_stderr, mode="w")
console_handler = logging.StreamHandler(stderr_file)
fmt = logging.Formatter("{levelname} {module}:{lineno}: {message}", style="{")
console_handler.setLevel(
((os.getenv("LOGLEVEL") or os.getenv("LOG_LEVEL")) or "DEBUG").upper()
)
console_handler.setFormatter(fmt)
logger.addHandler(console_handler)
# if i hear about epoll selector one more time i'mna end it
logging.getLogger("asyncio").setLevel("INFO")
logging.info("meep meep")
async def init_vector(self) -> None:
self.vector = await asyncio.create_subprocess_shell(
"vector --quiet -c ~/.vector/config/vector.toml",
stdin=self.vector_read,
shell=True,
)
self.tee = await asyncio.create_subprocess_shell(
# write to vector's fd and stdout
f"tee /dev/fd/{self.vector_write}",
# if we just set stdin to just PIPE, it would be a StreamWriter and not have a real fd
# so we're using the explicit pipe we opened earlier
stdin=self.tee_read,
stdout=self.original_stdout,
stderr=self.original_stderr,
# tee should have access to the vector fd
pass_fds=[self.vector_write],
)
# this seems to make things exit early without logging
# anyway init waits for orphaned processes to exit so it's fine
# async def cleanup(self) -> None:
# self.vector.terminate()
# self.tee.terminate()
# await self.vector.communicate()
# await self.tee.communicate()
async def main() -> None:
vector = Vector()
await vector.init_vector()
# this goes to tee, vector sees it's not json and leaves it unchanged
print("example print")
# subprocesses inherit the same thing
await (await asyncio.create_subprocess_shell("date")).wait()
# logging is special and prettyprinted to original stdout but structured for vector
logging.info("a log message with extra information", extra={"attribute": "42"})
await vector.cleanup()
[sources.stdin]
type = "stdin"
[transforms.log_json]
type = "remap"
inputs = ["stdin"]
source = '''
maybe_host = if .host == null {{}} else {{"host": .host}}
# if message is valid json and also an object, merge with that, otherwise keep message
. = merge(maybe_host, object(parse_json(.message) ?? null) ?? {"message": .message})
'''
[sinks.honeycomb]
type = "honeycomb"
inputs = ["log_json"]
api_key = "${HONEYCOMB_API_KEY}"
dataset = "vector-test"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment