-
-
Save szeevs/938ad3cf96e732d4b1b55a74015aed5b to your computer and use it in GitHub Desktop.
Supplementary handler, writes log into stdout, this allows FileTaskHandler to read K8S pod log from stdout using K8S api.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import sys | |
from airflow_common.custom_logging.file_task_handler import FileTaskHandler | |
from airflow.utils.log.logging_mixin import LoggingMixin | |
class KubernetesTaskHandler(FileTaskHandler, LoggingMixin): | |
""" | |
Supplementary handler, writes log into stdout, this allows FileTaskHandler to read | |
K8S pod log from stdout using K8S api. | |
""" | |
def __init__(self, base_log_folder, filename_template): | |
""" | |
:param base_log_folder: base folder to store logs locally | |
:param log_id_template: log id template | |
""" | |
super(KubernetesTaskHandler, self).__init__( | |
base_log_folder, filename_template) | |
self.closed = False | |
self.mark_end_on_close = True | |
self.handler = None | |
self.context_set = False | |
def set_context(self, ti): | |
""" | |
Provide task_instance context to airflow task handler. | |
:param ti: task instance object | |
""" | |
self.mark_end_on_close = not ti.raw | |
if self.context_set: | |
# We don't want to re-set up the handler if this logger has | |
# already been initialized | |
return | |
self.handler = logging.StreamHandler(stream=sys.__stdout__) | |
self.handler.setLevel(self.level) | |
self.handler.setFormatter(self.formatter) | |
self.context_set = True | |
def close(self): | |
# When application exit, system shuts down all handlers by | |
# calling close method. Here we check if logger is already | |
# closed to prevent uploading the log to remote storage multiple | |
# times when `logging.shutdown` is called. | |
if self.closed: | |
return | |
if not self.mark_end_on_close: | |
self.closed = True | |
return | |
# Case which context of the handler was not set. | |
if self.handler is None: | |
self.closed = True | |
return | |
# Reopen the file stream, because FileHandler.close() would be called | |
# first in logging.shutdown() and the stream in it would be set to None. | |
if self.handler.stream is None or self.handler.stream.closed: | |
self.handler.stream = self.handler._open() | |
self.handler.close() | |
sys.stdout = sys.__stdout__ | |
super(KubernetesTaskHandler, self).close() | |
self.closed = True |
@KarthikRajashekaran it doesn't matter, you just need to put it into your log_config.py
. It is described at the very beginning of Part 1 in the medium post
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Where should this file be located in airflow