Skip to content

Instantly share code, notes, and snippets.

@rjurney
Created February 20, 2021 05:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjurney/126971d3179c3bb196f9c992da562179 to your computer and use it in GitHub Desktop.
Save rjurney/126971d3179c3bb196f9c992da562179 to your computer and use it in GitHub Desktop.
Can't use frozendict due to pickle error even though frozendict very happily pickles :(
import pickle
b = pickle.dumps(afrozendict({"hello": "bob"}))
p = pickle.loads(b)
p
from frozendict import frozendict
# Lowercase the names
officers_df = oc.select(
"company_number", F.regexp_replace(F.lower(F.col("name")), ",", "").alias("name")
)
# Emit all pairs of officers at one company
def emit_pairs(x) -> Row:
"""Given a list of officers, emit all unique pairs"""
company_number = x[0]
officers = list(x[1])
officer_pairs = it.combinations(officers, 2)
for officer_pair in officer_pairs:
if officer_pair[0]["name"] != officer_pair[1]["name"]:
yield Row(
**{
"company_number": company_number,
"name1": officer_pair[0]["name"],
"name2": officer_pair[1]["name"],
}
)
# Run the function and convert to a DataFrame. Use frozendict to avoid
pairs_df = (
officers_df.rdd.map(lambda x: frozendict(x.asDict()))
.groupBy(lambda x: x["company_number"])
.flatMap(emit_pairs)
.toDF()
)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-32-dcf963c0623c> in <module>
25 # Run the function and convert to a DataFrame. Use frozendict to avoid
26 pairs_df = (
---> 27 officers_df.rdd.map(lambda x: (x.asDict()))
28 .groupBy(lambda x: x["company_number"])
29 .flatMap(emit_pairs)
/usr/lib/spark/python/pyspark/sql/session.py in toDF(self, schema, sampleRatio)
64 [Row(name='Alice', age=1)]
65 """
---> 66 return sparkSession.createDataFrame(self, schema, sampleRatio)
67
68 RDD.toDF = toDF
/usr/lib/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
669 return super(SparkSession, self).createDataFrame(
670 data, schema, samplingRatio, verifySchema)
--> 671 return self._create_dataframe(data, schema, samplingRatio, verifySchema)
672
673 def _create_dataframe(self, data, schema, samplingRatio, verifySchema):
/usr/lib/spark/python/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
692
693 if isinstance(data, RDD):
--> 694 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
695 else:
696 rdd, schema = self._createFromLocal(map(prepare, data), schema)
/usr/lib/spark/python/pyspark/sql/session.py in _createFromRDD(self, rdd, schema, samplingRatio)
480 """
481 if schema is None or isinstance(schema, (list, tuple)):
--> 482 struct = self._inferSchema(rdd, samplingRatio, names=schema)
483 converter = _create_converter(struct)
484 rdd = rdd.map(converter)
/usr/lib/spark/python/pyspark/sql/session.py in _inferSchema(self, rdd, samplingRatio, names)
454 :class:`pyspark.sql.types.StructType`
455 """
--> 456 first = rdd.first()
457 if not first:
458 raise ValueError("The first row in RDD is empty, "
/usr/lib/spark/python/pyspark/rdd.py in first(self)
1584 ValueError: RDD is empty
1585 """
-> 1586 rs = self.take(1)
1587 if rs:
1588 return rs[0]
/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
1564
1565 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1566 res = self.context.runJob(self, takeUpToNumLeft, p)
1567
1568 items += res
/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
1231 # SparkContext#runJob.
1232 mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1233 sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
1234 return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
1235
/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 59.0 failed 4 times, most recent failure: Lost task 6.3 in stage 59.0 (TID 3242) (deep-miniconda-3-jupyter-spark-cluster-m.us-west1-c.c.useful-space-293223.internal executor 14): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process
out_iter = func(split_index, iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 418, in func
return f(iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2234, in combine
merger.mergeValues(iterator)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 247, in mergeValues
self._spill()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 319, in _spill
self.serializer.dump_stream([(k, v)], streams[h])
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
bytes = self.serializer.dumps(vs)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 506, in dumps
return zlib.compress(self.serializer.dumps(obj), 1)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 427, in dumps
return pickle.dumps(obj, pickle_protocol)
_pickle.PicklingError: Can't pickle <class '__main__.frozendict'>: attribute lookup frozendict on __main__ failed
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2254)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2203)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2202)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2202)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2441)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2383)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2372)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process
out_iter = func(split_index, iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 418, in func
return f(iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2234, in combine
merger.mergeValues(iterator)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 247, in mergeValues
self._spill()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 319, in _spill
self.serializer.dump_stream([(k, v)], streams[h])
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
bytes = self.serializer.dumps(vs)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 506, in dumps
return zlib.compress(self.serializer.dumps(obj), 1)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 427, in dumps
return pickle.dumps(obj, pickle_protocol)
_pickle.PicklingError: Can't pickle <class '__main__.frozendict'>: attribute lookup frozendict on __main__ failed
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
frozendict
The error says main.frozendict because I actually tried pasting the class into main.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment