Skip to content

Instantly share code, notes, and snippets.

@soxofaan
Created April 7, 2020 08:54
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save soxofaan/8f00d98b5687cdee17ff5cb2713beb26 to your computer and use it in GitHub Desktop.
Save soxofaan/8f00d98b5687cdee17ff5cb2713beb26 to your computer and use it in GitHub Desktop.
Decorator based trick to enable Python logging from PySpark executors
import functools
import logging
import pyspark
from typing import Callable
LOG_FORMAT = "[P%(process)s/%(name)s] %(levelname)s: %(message)s"
def ensure_executor_logging(
f=None, *,
level=logging.INFO,
format=LOG_FORMAT
):
"""
Decorator to enable standard Python logging from functions that are used in PySpark executors.
The PySpark workers are forked processes without any Python logging setup.
This means that standard Python logging messages are lost (even if logging has been set up in the
PySpark driver script).
Use this decorator for functions that are used in PySpark executor contexts, to ensure a minimal logging setup.
"""
def set_up_logging():
"""Set up logging if not already (short version of `logging.basicConfig`)"""
root = logging.getLogger()
if len(root.handlers) == 0:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(format))
root.addHandler(handler)
if level is not None:
root.setLevel(level)
def decorator(f: Callable):
@functools.wraps(f)
def wrapped(*args, **kwargs):
set_up_logging()
return f(*args, **kwargs)
return wrapped
# Was decorator used without parenthesis or parameterized?
return decorator(f) if callable(f) else decorator
@ensure_executor_logging
def manipulate(x):
logging.getLogger("manipulate").info("got {x}".format(x=x))
return x * x
def main():
log = logging.getLogger("main")
sc = pyspark.SparkContext.getOrCreate()
log.info("Spark context: {s!r}".format(s=sc))
rdd = sc.parallelize(range(10))
log.info("rdd: {r!r}".format(r=rdd))
result = rdd.map(manipulate).collect()
print(result)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
main()
@soxofaan
Copy link
Author

soxofaan commented Apr 8, 2020

created a package for this now: https://github.com/soxofaan/epsel

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment