Created
April 7, 2020 08:54
-
-
Save soxofaan/8f00d98b5687cdee17ff5cb2713beb26 to your computer and use it in GitHub Desktop.
Decorator based trick to enable Python logging from PySpark executors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import logging | |
import pyspark | |
from typing import Callable | |
LOG_FORMAT = "[P%(process)s/%(name)s] %(levelname)s: %(message)s" | |
def ensure_executor_logging( | |
f=None, *, | |
level=logging.INFO, | |
format=LOG_FORMAT | |
): | |
""" | |
Decorator to enable standard Python logging from functions that are used in PySpark executors. | |
The PySpark workers are forked processes without any Python logging setup. | |
This means that standard Python logging messages are lost (even if logging has been set up in the | |
PySpark driver script). | |
Use this decorator for functions that are used in PySpark executor contexts, to ensure a minimal logging setup. | |
""" | |
def set_up_logging(): | |
"""Set up logging if not already (short version of `logging.basicConfig`)""" | |
root = logging.getLogger() | |
if len(root.handlers) == 0: | |
handler = logging.StreamHandler() | |
handler.setFormatter(logging.Formatter(format)) | |
root.addHandler(handler) | |
if level is not None: | |
root.setLevel(level) | |
def decorator(f: Callable): | |
@functools.wraps(f) | |
def wrapped(*args, **kwargs): | |
set_up_logging() | |
return f(*args, **kwargs) | |
return wrapped | |
# Was decorator used without parenthesis or parameterized? | |
return decorator(f) if callable(f) else decorator | |
@ensure_executor_logging | |
def manipulate(x): | |
logging.getLogger("manipulate").info("got {x}".format(x=x)) | |
return x * x | |
def main(): | |
log = logging.getLogger("main") | |
sc = pyspark.SparkContext.getOrCreate() | |
log.info("Spark context: {s!r}".format(s=sc)) | |
rdd = sc.parallelize(range(10)) | |
log.info("rdd: {r!r}".format(r=rdd)) | |
result = rdd.map(manipulate).collect() | |
print(result) | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
created a package for this now: https://github.com/soxofaan/epsel