Skip to content

Instantly share code, notes, and snippets.

@micimize
Last active February 27, 2019 21:53
Show Gist options
  • Save micimize/90c8dc9426f567b5ed7f26b5018c9ffe to your computer and use it in GitHub Desktop.
Save micimize/90c8dc9426f567b5ed7f26b5018c9ffe to your computer and use it in GitHub Desktop.
Patches for the python stackdriver handlers for adding custom fields
import logging
import typing as t
from google.cloud.logging.handlers.handlers import (
DEFAULT_LOGGER_NAME,
CloudLoggingHandler,
)
from google.cloud.logging.handlers.transports import background_thread
class _Worker(background_thread._Worker):
def enqueue(
self,
record,
message,
extra=None,
resource=None,
labels=None,
trace=None,
span_id=None,
):
"""Queues a log entry to be written by the background thread.
:type record: :class:`logging.LogRecord`
:param record: Python log record that the handler was called with.
:type message: str
:param message: The message from the ``LogRecord`` after being
formatted by the associated log formatters.
:type extra: dict
:param extra: (Optional) Extra data from the ``LogRecord``
COnfigured and managed from the log handler
:type resource: :class:`~google.cloud.logging.resource.Resource`
:param resource: (Optional) Monitored resource of the entry
:type labels: dict
:param labels: (Optional) Mapping of labels for the entry.
:type trace: str
:param trace: (optional) traceid to apply to the logging entry.
:type span_id: str
:param span_id: (optional) span_id within the trace for the log entry.
Specify the trace parameter if span_id is set.
"""
self._queue.put_nowait(
{
"info": {
"message": message,
"python_logger": record.name,
**(extra or {}),
},
"severity": record.levelname,
"resource": resource,
"labels": labels,
"trace": trace,
"span_id": span_id,
}
)
class BackgroundThreadTransport(background_thread.BackgroundThreadTransport):
# NOTE we have to override the __init__ entirely because worker is not an arg
def __init__(
self,
client,
name,
grace_period=background_thread._DEFAULT_GRACE_PERIOD,
batch_size=background_thread._DEFAULT_MAX_BATCH_SIZE,
max_latency=background_thread._DEFAULT_MAX_LATENCY,
):
self.client = client
logger = self.client.logger(name)
self.worker = _Worker(
logger,
grace_period=grace_period,
max_batch_size=batch_size,
max_latency=max_latency,
)
self.worker.start()
def send(
self,
record,
message,
extra=None,
resource=None,
labels=None,
trace=None,
span_id=None,
):
"""Overrides Transport.send().
:type record: :class:`logging.LogRecord`
:param record: Python log record that the handler was called with.
:type message: str
:param message: The message from the ``LogRecord`` after being
formatted by the associated log formatters.
:type extra: dict
:param extra: (Optional) Extra data from the ``LogRecord``
COnfigured and managed from the log handler
:type resource: :class:`~google.cloud.logging.resource.Resource`
:param resource: (Optional) Monitored resource of the entry.
:type labels: dict
:param labels: (Optional) Mapping of labels for the entry.
:type trace: str
:param trace: (optional) traceid to apply to the logging entry.
:type span_id: str
:param span_id: (optional) span_id within the trace for the log entry.
Specify the trace parameter if span_id is set.
"""
self.worker.enqueue(
record,
message,
extra=extra,
resource=resource,
labels=labels,
trace=trace,
span_id=span_id,
)
class StructuredCloudLoggingHandler(CloudLoggingHandler):
"""Extend the CloudLoggingHandler to add additional fields
"""
additional_fields: t.List[str]
def __init__(
self,
client,
additional_fields: t.List[str],
name=DEFAULT_LOGGER_NAME,
transport=BackgroundThreadTransport, # inject our patched transport
**kwargs
):
super(StructuredCloudLoggingHandler, self).__init__(
client=client, name=name, transport=transport, **kwargs
)
self.additional_fields = additional_fields
def _extract_extra_fields(self, record):
extra: t.Dict[str, t.Any] = {}
for field in self.additional_fields:
value = record.__dict__.get(field, None)
if value:
extra[field] = value
return extra
def emit(self, record):
"""Actually log the specified logging record.
Overrides the default emit behavior of ``StreamHandler``.
See https://docs.python.org/2/library/logging.html#handler-objects
:type record: :class:`logging.LogRecord`
:param record: The record to be logged.
"""
message = super(CloudLoggingHandler, self).format(record)
extra = self._extract_extra_fields(record)
self.transport.send(
record, message, extra=extra, resource=self.resource, labels=self.labels
)
import json
import logging
import math
import typing as t
from google.cloud.logging.handlers import ContainerEngineHandler
class StructuredStackdriverHandler(ContainerEngineHandler):
"""Extend the ContainerEngineHandler to add additional fields
"""
additional_fields: t.List[str]
def __init__(self, name: str, additional_fields: t.List[str]):
super(StructuredStackdriverHandler, self).__init__(name=name)
self.additional_fields = additional_fields
def _format_stackdriver_json(self, record, message):
"""Helper to format a LogRecord in in Stackdriver fluentd format.
:rtype: str
:returns: JSON str to be written to the log file.
"""
subsecond, second = math.modf(record.created)
payload = {
"message": message,
"timestamp": {"seconds": int(second), "nanos": int(subsecond * 1e9)},
"thread": record.thread,
"severity": record.levelname,
}
for field in self.additional_fields:
value = record.__dict__.get(field, None)
if value:
payload[field] = value
return json.dumps(payload)
def format(self, record: logging.LogRecord):
message = super(ContainerEngineHandler, self).format(record)
return self._format_stackdriver_json(record, message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment