Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save alexwoolford/347197e04bb5dd777a7cea6aeb6e967b to your computer and use it in GitHub Desktop.
Save alexwoolford/347197e04bb5dd777a7cea6aeb6e967b to your computer and use it in GitHub Desktop.
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-47-6c1511e75a33> in <module>()
----> 1 process_dataframe(data_exchange)
<ipython-input-46-06eb1c674fde> in process_dataframe(dataframe)
3 column_permutations = [permutation for permutation in permutations(columns, 2)]
4 dataframe_sample = dataframe.sample(withReplacement=False, fraction=0.0001, seed=1234)
----> 5 dataframe_sample.foreach(lambda x: persist_linking_elements(x, column_permutations))
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/dataframe.py in foreach(self, f)
351 >>> df.foreach(f)
352 """
--> 353 return self.rdd.foreach(f)
354
355 @since(1.3)
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/rdd.py in foreach(self, f)
747 f(x)
748 return iter([])
--> 749 self.mapPartitions(processPartition).count() # Force evaluation
750
751 def foreachPartition(self, f):
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/rdd.py in count(self)
1004 3
1005 """
-> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1007
1008 def stats(self):
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/rdd.py in sum(self)
995 6.0
996 """
--> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
998
999 def count(self):
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
869 # zeroValue provided to each partition is unique from the one provided
870 # to the final reduce call
--> 871 vals = self.mapPartitions(func).collect()
872 return reduce(op, vals, zeroValue)
873
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/rdd.py in collect(self)
771 """
772 with SCCallSiteSync(self.context) as css:
--> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
774 return list(_load_from_socket(port, self._jrdd_deserializer))
775
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/rdd.py in _jrdd(self)
2386 command = (self.func, profiler, self._prev_jrdd_deserializer,
2387 self._jrdd_deserializer)
-> 2388 pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
2389 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
2390 bytearray(pickled_cmd),
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command, obj)
2306 # the serialized command will be compressed by broadcast
2307 ser = CloudPickleSerializer()
-> 2308 pickled_command = ser.dumps(command)
2309 if len(pickled_command) > (1 << 20): # 1M
2310 # The broadcast will have same life cycle as created PythonRDD
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/serializers.py in dumps(self, obj)
426
427 def dumps(self, obj):
--> 428 return cloudpickle.dumps(obj, 2)
429
430
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol)
644
645 cp = CloudPickler(file,protocol)
--> 646 cp.dump(obj)
647
648 return file.getvalue()
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in dump(self, obj)
105 self.inject_addons()
106 try:
--> 107 return Pickler.dump(self, obj)
108 except RuntimeError as e:
109 if 'recursion' in e.args[0]:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in dump(self, obj)
222 if self.proto >= 2:
223 self.write(PROTO + chr(self.proto))
--> 224 self.save(obj)
225 self.write(STOP)
226
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
566 write(MARK)
567 for element in obj:
--> 568 save(element)
569
570 if id(obj) in memo:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
197 klass = getattr(themodule, name, None)
198 if klass is None or klass is not obj:
--> 199 self.save_function_tuple(obj)
200 return
201
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
552 if n <= 3 and proto >= 2:
553 for element in obj:
--> 554 save(element)
555 # Subtle. Same as in the big comment below.
556 if id(obj) in memo:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_list(self, obj)
604
605 self.memoize(obj)
--> 606 self._batch_appends(iter(obj))
607
608 dispatch[ListType] = save_list
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in _batch_appends(self, items)
637 write(MARK)
638 for x in tmp:
--> 639 save(x)
640 write(APPENDS)
641 elif n:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
197 klass = getattr(themodule, name, None)
198 if klass is None or klass is not obj:
--> 199 self.save_function_tuple(obj)
200 return
201
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
552 if n <= 3 and proto >= 2:
553 for element in obj:
--> 554 save(element)
555 # Subtle. Same as in the big comment below.
556 if id(obj) in memo:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_list(self, obj)
604
605 self.memoize(obj)
--> 606 self._batch_appends(iter(obj))
607
608 dispatch[ListType] = save_list
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in _batch_appends(self, items)
637 write(MARK)
638 for x in tmp:
--> 639 save(x)
640 write(APPENDS)
641 elif n:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
197 klass = getattr(themodule, name, None)
198 if klass is None or klass is not obj:
--> 199 self.save_function_tuple(obj)
200 return
201
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
552 if n <= 3 and proto >= 2:
553 for element in obj:
--> 554 save(element)
555 # Subtle. Same as in the big comment below.
556 if id(obj) in memo:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_list(self, obj)
604
605 self.memoize(obj)
--> 606 self._batch_appends(iter(obj))
607
608 dispatch[ListType] = save_list
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in _batch_appends(self, items)
637 write(MARK)
638 for x in tmp:
--> 639 save(x)
640 write(APPENDS)
641 elif n:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
197 klass = getattr(themodule, name, None)
198 if klass is None or klass is not obj:
--> 199 self.save_function_tuple(obj)
200 return
201
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
552 if n <= 3 and proto >= 2:
553 for element in obj:
--> 554 save(element)
555 # Subtle. Same as in the big comment below.
556 if id(obj) in memo:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_list(self, obj)
604
605 self.memoize(obj)
--> 606 self._batch_appends(iter(obj))
607
608 dispatch[ListType] = save_list
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in _batch_appends(self, items)
640 write(APPENDS)
641 elif n:
--> 642 save(tmp[0])
643 write(APPEND)
644 # else tmp is empty, and we're done
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
197 klass = getattr(themodule, name, None)
198 if klass is None or klass is not obj:
--> 199 self.save_function_tuple(obj)
200 return
201
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
552 if n <= 3 and proto >= 2:
553 for element in obj:
--> 554 save(element)
555 # Subtle. Same as in the big comment below.
556 if id(obj) in memo:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_list(self, obj)
604
605 self.memoize(obj)
--> 606 self._batch_appends(iter(obj))
607
608 dispatch[ListType] = save_list
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in _batch_appends(self, items)
640 write(APPENDS)
641 elif n:
--> 642 save(tmp[0])
643 write(APPEND)
644 # else tmp is empty, and we're done
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
192 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
--> 193 self.save_function_tuple(obj)
194 return
195 else:
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
239
240 # save the rest of the func data needed by _fill_function
--> 241 save(f_globals)
242 save(defaults)
243 save(dct)
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
690 k, v = tmp[0]
691 save(k)
--> 692 save(v)
693 write(SETITEM)
694 # else tmp is empty, and we're done
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
192 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
--> 193 self.save_function_tuple(obj)
194 return
195 else:
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
239
240 # save the rest of the func data needed by _fill_function
--> 241 save(f_globals)
242 save(defaults)
243 save(dct)
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
685 for k, v in tmp:
686 save(k)
--> 687 save(v)
688 write(SETITEMS)
689 elif n:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
192 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
--> 193 self.save_function_tuple(obj)
194 return
195 else:
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
239
240 # save the rest of the func data needed by _fill_function
--> 241 save(f_globals)
242 save(defaults)
243 save(dct)
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
685 for k, v in tmp:
686 save(k)
--> 687 save(v)
688 write(SETITEMS)
689 elif n:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
329
330 # Save the reduce() output and finally memoize the object
--> 331 self.save_reduce(obj=obj, *rv)
332
333 def persistent_id(self, obj):
/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
540
541 if state is not None:
--> 542 save(state)
543 write(pickle.BUILD)
544
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save_dict(self, obj)
653
654 self.memoize(obj)
--> 655 self._batch_setitems(obj.iteritems())
656
657 dispatch[DictionaryType] = save_dict
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
685 for k, v in tmp:
686 save(k)
--> 687 save(v)
688 write(SETITEMS)
689 elif n:
/opt/cloudera/parcels/Anaconda/lib/python2.7/pickle.pyc in save(self, obj)
304 reduce = getattr(obj, "__reduce_ex__", None)
305 if reduce:
--> 306 rv = reduce(self.proto)
307 else:
308 reduce = getattr(obj, "__reduce__", None)
TypeError: can't pickle thread.lock objects
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment