Last active
February 27, 2019 21:53
-
-
Save micimize/90c8dc9426f567b5ed7f26b5018c9ffe to your computer and use it in GitHub Desktop.
Patches for the python stackdriver handlers for adding custom fields
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import typing as t | |
from google.cloud.logging.handlers.handlers import ( | |
DEFAULT_LOGGER_NAME, | |
CloudLoggingHandler, | |
) | |
from google.cloud.logging.handlers.transports import background_thread | |
class _Worker(background_thread._Worker): | |
def enqueue( | |
self, | |
record, | |
message, | |
extra=None, | |
resource=None, | |
labels=None, | |
trace=None, | |
span_id=None, | |
): | |
"""Queues a log entry to be written by the background thread. | |
:type record: :class:`logging.LogRecord` | |
:param record: Python log record that the handler was called with. | |
:type message: str | |
:param message: The message from the ``LogRecord`` after being | |
formatted by the associated log formatters. | |
:type extra: dict | |
:param extra: (Optional) Extra data from the ``LogRecord`` | |
COnfigured and managed from the log handler | |
:type resource: :class:`~google.cloud.logging.resource.Resource` | |
:param resource: (Optional) Monitored resource of the entry | |
:type labels: dict | |
:param labels: (Optional) Mapping of labels for the entry. | |
:type trace: str | |
:param trace: (optional) traceid to apply to the logging entry. | |
:type span_id: str | |
:param span_id: (optional) span_id within the trace for the log entry. | |
Specify the trace parameter if span_id is set. | |
""" | |
self._queue.put_nowait( | |
{ | |
"info": { | |
"message": message, | |
"python_logger": record.name, | |
**(extra or {}), | |
}, | |
"severity": record.levelname, | |
"resource": resource, | |
"labels": labels, | |
"trace": trace, | |
"span_id": span_id, | |
} | |
) | |
class BackgroundThreadTransport(background_thread.BackgroundThreadTransport): | |
# NOTE we have to override the __init__ entirely because worker is not an arg | |
def __init__( | |
self, | |
client, | |
name, | |
grace_period=background_thread._DEFAULT_GRACE_PERIOD, | |
batch_size=background_thread._DEFAULT_MAX_BATCH_SIZE, | |
max_latency=background_thread._DEFAULT_MAX_LATENCY, | |
): | |
self.client = client | |
logger = self.client.logger(name) | |
self.worker = _Worker( | |
logger, | |
grace_period=grace_period, | |
max_batch_size=batch_size, | |
max_latency=max_latency, | |
) | |
self.worker.start() | |
def send( | |
self, | |
record, | |
message, | |
extra=None, | |
resource=None, | |
labels=None, | |
trace=None, | |
span_id=None, | |
): | |
"""Overrides Transport.send(). | |
:type record: :class:`logging.LogRecord` | |
:param record: Python log record that the handler was called with. | |
:type message: str | |
:param message: The message from the ``LogRecord`` after being | |
formatted by the associated log formatters. | |
:type extra: dict | |
:param extra: (Optional) Extra data from the ``LogRecord`` | |
COnfigured and managed from the log handler | |
:type resource: :class:`~google.cloud.logging.resource.Resource` | |
:param resource: (Optional) Monitored resource of the entry. | |
:type labels: dict | |
:param labels: (Optional) Mapping of labels for the entry. | |
:type trace: str | |
:param trace: (optional) traceid to apply to the logging entry. | |
:type span_id: str | |
:param span_id: (optional) span_id within the trace for the log entry. | |
Specify the trace parameter if span_id is set. | |
""" | |
self.worker.enqueue( | |
record, | |
message, | |
extra=extra, | |
resource=resource, | |
labels=labels, | |
trace=trace, | |
span_id=span_id, | |
) | |
class StructuredCloudLoggingHandler(CloudLoggingHandler): | |
"""Extend the CloudLoggingHandler to add additional fields | |
""" | |
additional_fields: t.List[str] | |
def __init__( | |
self, | |
client, | |
additional_fields: t.List[str], | |
name=DEFAULT_LOGGER_NAME, | |
transport=BackgroundThreadTransport, # inject our patched transport | |
**kwargs | |
): | |
super(StructuredCloudLoggingHandler, self).__init__( | |
client=client, name=name, transport=transport, **kwargs | |
) | |
self.additional_fields = additional_fields | |
def _extract_extra_fields(self, record): | |
extra: t.Dict[str, t.Any] = {} | |
for field in self.additional_fields: | |
value = record.__dict__.get(field, None) | |
if value: | |
extra[field] = value | |
return extra | |
def emit(self, record): | |
"""Actually log the specified logging record. | |
Overrides the default emit behavior of ``StreamHandler``. | |
See https://docs.python.org/2/library/logging.html#handler-objects | |
:type record: :class:`logging.LogRecord` | |
:param record: The record to be logged. | |
""" | |
message = super(CloudLoggingHandler, self).format(record) | |
extra = self._extract_extra_fields(record) | |
self.transport.send( | |
record, message, extra=extra, resource=self.resource, labels=self.labels | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import math | |
import typing as t | |
from google.cloud.logging.handlers import ContainerEngineHandler | |
class StructuredStackdriverHandler(ContainerEngineHandler): | |
"""Extend the ContainerEngineHandler to add additional fields | |
""" | |
additional_fields: t.List[str] | |
def __init__(self, name: str, additional_fields: t.List[str]): | |
super(StructuredStackdriverHandler, self).__init__(name=name) | |
self.additional_fields = additional_fields | |
def _format_stackdriver_json(self, record, message): | |
"""Helper to format a LogRecord in in Stackdriver fluentd format. | |
:rtype: str | |
:returns: JSON str to be written to the log file. | |
""" | |
subsecond, second = math.modf(record.created) | |
payload = { | |
"message": message, | |
"timestamp": {"seconds": int(second), "nanos": int(subsecond * 1e9)}, | |
"thread": record.thread, | |
"severity": record.levelname, | |
} | |
for field in self.additional_fields: | |
value = record.__dict__.get(field, None) | |
if value: | |
payload[field] = value | |
return json.dumps(payload) | |
def format(self, record: logging.LogRecord): | |
message = super(ContainerEngineHandler, self).format(record) | |
return self._format_stackdriver_json(record, message) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment