Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
This gist demonstrates that spark 0.9.1 (and I'm guessing also 1.0.0) don't serialize a logger instance properly when code runs on workers
"""
This gist demonstrates that spark 1.0.0 and 0.9.1
don't serialize a logger instance properly when code runs on workers.
run this code via:
spark-submit spark_serialization_demo.py
- or -
pyspark spark_serialization_demo.py
"""
import pyspark
from os.path import abspath
import logging
# initialize logger
log = logging.getLogger('alexTest')
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("%(levelname)s %(msg)s"))
log.addHandler(_h)
log.setLevel(logging.DEBUG)
log.info("module imported and logger initialized")
FUNC = 'passes()'
def myfunc(*ignore_args):
log.debug('logging a line from: %s' % FUNC)
return 0
def passes():
mycode_module = __import__('spark_serialization_demo')
print(textFile.map(mycode_module.myfunc, preservesPartitioning=True).take(5))
def fails():
print(textFile.map(myfunc, preservesPartitioning=True).take(5))
raise Exception("Never reach this point because code fails first due to serialization error")
if __name__ == '__main__':
sc = pyspark.SparkContext("local[10]", 'test')
textFile = sc.textFile("file://%s" % abspath(__file__), 5)
print('\n\n---')
FUNC = 'fails()'
log.info(
"This example fails because it serializes a function that"
"does not initialize the logger when the function is unserialized")
try:
fails()
except Exception as err:
log.error("See, I failed! Details: %s" % err)
print('\n---')
log.info(
"This example passes because it serializes a module that initializes"
" the logger when the module is unserialized")
passes()
@larryhu

This comment has been minimized.

Copy link

@larryhu larryhu commented Jul 1, 2016

It's not work on 1.6.1. Always throw:

16/07/01 15:25:05 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/danqoo/apps/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/home/danqoo/apps/spark/current/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/home/danqoo/apps/spark/current/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
UnpicklingError: NEWOBJ class argument has NULL tp_new

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
@javibravo

This comment has been minimized.

Copy link

@javibravo javibravo commented Jul 27, 2016

Hi,

I am running into the same issue commented by @larryhu. Has someone found any solution for this ?

Thanks,
Javier.

@shett044

This comment has been minimized.

Copy link

@shett044 shett044 commented Aug 2, 2016

This can be solved by adding get_logger in the mapper function and making sure that you just give filename.
This will make sure that, it will create N logger and N log file. (N is the number of executors).
These log files are available at "SPARK_INSTALLATION_DIR/work/ " inside an application directory(starts with app-) inside executor id (these are numbers).

@shett044

This comment has been minimized.

Copy link

@shett044 shett044 commented Aug 2, 2016

Did some digging :
https://medium.com/@anicolaspp/how-to-log-in-apache-spark-f4204fad78a#.cq6k7d7x8
It will be great if someone can convert the code in python

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.