Created
February 20, 2021 05:28
-
-
Save rjurney/126971d3179c3bb196f9c992da562179 to your computer and use it in GitHub Desktop.
Can't use frozendict due to pickle error even though frozendict very happily pickles :(
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
b = pickle.dumps(afrozendict({"hello": "bob"})) | |
p = pickle.loads(b) | |
p |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from frozendict import frozendict | |
# Lowercase the names | |
officers_df = oc.select( | |
"company_number", F.regexp_replace(F.lower(F.col("name")), ",", "").alias("name") | |
) | |
# Emit all pairs of officers at one company | |
def emit_pairs(x) -> Row: | |
"""Given a list of officers, emit all unique pairs""" | |
company_number = x[0] | |
officers = list(x[1]) | |
officer_pairs = it.combinations(officers, 2) | |
for officer_pair in officer_pairs: | |
if officer_pair[0]["name"] != officer_pair[1]["name"]: | |
yield Row( | |
**{ | |
"company_number": company_number, | |
"name1": officer_pair[0]["name"], | |
"name2": officer_pair[1]["name"], | |
} | |
) | |
# Run the function and convert to a DataFrame. Use frozendict to avoid | |
pairs_df = ( | |
officers_df.rdd.map(lambda x: frozendict(x.asDict())) | |
.groupBy(lambda x: x["company_number"]) | |
.flatMap(emit_pairs) | |
.toDF() | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--------------------------------------------------------------------------- | |
Py4JJavaError Traceback (most recent call last) | |
<ipython-input-32-dcf963c0623c> in <module> | |
25 # Run the function and convert to a DataFrame. Use frozendict to avoid | |
26 pairs_df = ( | |
---> 27 officers_df.rdd.map(lambda x: (x.asDict())) | |
28 .groupBy(lambda x: x["company_number"]) | |
29 .flatMap(emit_pairs) | |
/usr/lib/spark/python/pyspark/sql/session.py in toDF(self, schema, sampleRatio) | |
64 [Row(name='Alice', age=1)] | |
65 """ | |
---> 66 return sparkSession.createDataFrame(self, schema, sampleRatio) | |
67 | |
68 RDD.toDF = toDF | |
/usr/lib/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema) | |
669 return super(SparkSession, self).createDataFrame( | |
670 data, schema, samplingRatio, verifySchema) | |
--> 671 return self._create_dataframe(data, schema, samplingRatio, verifySchema) | |
672 | |
673 def _create_dataframe(self, data, schema, samplingRatio, verifySchema): | |
/usr/lib/spark/python/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema) | |
692 | |
693 if isinstance(data, RDD): | |
--> 694 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) | |
695 else: | |
696 rdd, schema = self._createFromLocal(map(prepare, data), schema) | |
/usr/lib/spark/python/pyspark/sql/session.py in _createFromRDD(self, rdd, schema, samplingRatio) | |
480 """ | |
481 if schema is None or isinstance(schema, (list, tuple)): | |
--> 482 struct = self._inferSchema(rdd, samplingRatio, names=schema) | |
483 converter = _create_converter(struct) | |
484 rdd = rdd.map(converter) | |
/usr/lib/spark/python/pyspark/sql/session.py in _inferSchema(self, rdd, samplingRatio, names) | |
454 :class:`pyspark.sql.types.StructType` | |
455 """ | |
--> 456 first = rdd.first() | |
457 if not first: | |
458 raise ValueError("The first row in RDD is empty, " | |
/usr/lib/spark/python/pyspark/rdd.py in first(self) | |
1584 ValueError: RDD is empty | |
1585 """ | |
-> 1586 rs = self.take(1) | |
1587 if rs: | |
1588 return rs[0] | |
/usr/lib/spark/python/pyspark/rdd.py in take(self, num) | |
1564 | |
1565 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) | |
-> 1566 res = self.context.runJob(self, takeUpToNumLeft, p) | |
1567 | |
1568 items += res | |
/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) | |
1231 # SparkContext#runJob. | |
1232 mappedRDD = rdd.mapPartitions(partitionFunc) | |
-> 1233 sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) | |
1234 return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer)) | |
1235 | |
/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args) | |
1302 | |
1303 answer = self.gateway_client.send_command(command) | |
-> 1304 return_value = get_return_value( | |
1305 answer, self.gateway_client, self.target_id, self.name) | |
1306 | |
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) | |
109 def deco(*a, **kw): | |
110 try: | |
--> 111 return f(*a, **kw) | |
112 except py4j.protocol.Py4JJavaError as e: | |
113 converted = convert_exception(e.java_exception) | |
/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) | |
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) | |
325 if answer[1] == REFERENCE_TYPE: | |
--> 326 raise Py4JJavaError( | |
327 "An error occurred while calling {0}{1}{2}.\n". | |
328 format(target_id, ".", name), value) | |
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. | |
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 59.0 failed 4 times, most recent failure: Lost task 6.3 in stage 59.0 (TID 3242) (deep-miniconda-3-jupyter-spark-cluster-m.us-west1-c.c.useful-space-293223.internal executor 14): org.apache.spark.api.python.PythonException: Traceback (most recent call last): | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main | |
process() | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process | |
out_iter = func(split_index, iterator) | |
File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func | |
return func(split, prev_func(split, iterator)) | |
File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func | |
return func(split, prev_func(split, iterator)) | |
File "/usr/lib/spark/python/pyspark/rdd.py", line 418, in func | |
return f(iterator) | |
File "/usr/lib/spark/python/pyspark/rdd.py", line 2234, in combine | |
merger.mergeValues(iterator) | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 247, in mergeValues | |
self._spill() | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 319, in _spill | |
self.serializer.dump_stream([(k, v)], streams[h]) | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream | |
bytes = self.serializer.dumps(vs) | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 506, in dumps | |
return zlib.compress(self.serializer.dumps(obj), 1) | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 427, in dumps | |
return pickle.dumps(obj, pickle_protocol) | |
_pickle.PicklingError: Can't pickle <class '__main__.frozendict'>: attribute lookup frozendict on __main__ failed | |
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517) | |
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652) | |
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635) | |
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470) | |
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) | |
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211) | |
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217) | |
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) | |
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) | |
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) | |
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) | |
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) | |
at org.apache.spark.scheduler.Task.run(Task.scala:131) | |
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) | |
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) | |
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) | |
at java.lang.Thread.run(Thread.java:748) | |
Driver stacktrace: | |
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2254) | |
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2203) | |
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2202) | |
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) | |
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) | |
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) | |
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2202) | |
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078) | |
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078) | |
at scala.Option.foreach(Option.scala:407) | |
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078) | |
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2441) | |
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2383) | |
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2372) | |
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) | |
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) | |
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202) | |
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223) | |
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242) | |
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166) | |
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:498) | |
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) | |
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) | |
at py4j.Gateway.invoke(Gateway.java:282) | |
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) | |
at py4j.commands.CallCommand.execute(CallCommand.java:79) | |
at py4j.GatewayConnection.run(GatewayConnection.java:238) | |
at java.lang.Thread.run(Thread.java:748) | |
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main | |
process() | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process | |
out_iter = func(split_index, iterator) | |
File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func | |
return func(split, prev_func(split, iterator)) | |
File "/usr/lib/spark/python/pyspark/rdd.py", line 2916, in pipeline_func | |
return func(split, prev_func(split, iterator)) | |
File "/usr/lib/spark/python/pyspark/rdd.py", line 418, in func | |
return f(iterator) | |
File "/usr/lib/spark/python/pyspark/rdd.py", line 2234, in combine | |
merger.mergeValues(iterator) | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 247, in mergeValues | |
self._spill() | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 319, in _spill | |
self.serializer.dump_stream([(k, v)], streams[h]) | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream | |
bytes = self.serializer.dumps(vs) | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 506, in dumps | |
return zlib.compress(self.serializer.dumps(obj), 1) | |
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 427, in dumps | |
return pickle.dumps(obj, pickle_protocol) | |
_pickle.PicklingError: Can't pickle <class '__main__.frozendict'>: attribute lookup frozendict on __main__ failed | |
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517) | |
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652) | |
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635) | |
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470) | |
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) | |
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211) | |
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217) | |
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) | |
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) | |
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) | |
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) | |
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) | |
at org.apache.spark.scheduler.Task.run(Task.scala:131) | |
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) | |
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) | |
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) | |
... 1 more | |
frozendict |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The error says main.frozendict because I actually tried pasting the class into main. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment