Skip to content

Instantly share code, notes, and snippets.

@kenttw
Created July 8, 2015 07:57
Show Gist options
  • Save kenttw/e68cbc00525358bd82c8 to your computer and use it in GitHub Desktop.
Save kenttw/e68cbc00525358bd82c8 to your computer and use it in GitHub Desktop.
pyspark - ChiSqSelector Error
from pyspark.mllib.feature import ChiSqSelector
model = ChiSqSelector(5000).fit(sc.parallelize(lc))
chi_l = l.mapValues(lambda x : model.transform (x))
print chi_l.first()
出現以下的訊息
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-6-aff828b23220> in <module>()
2 model = ChiSqSelector(5000).fit(sc.parallelize(lc))
3 chi_l = l.mapValues(lambda x : model.transform (x))
----> 4 print chi_l.first()
/opt/spark/python/pyspark/rdd.py in first(self)
1281 ValueError: RDD is empty
1282 """
-> 1283 rs = self.take(1)
1284 if rs:
1285 return rs[0]
/opt/spark/python/pyspark/rdd.py in take(self, num)
1263
1264 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1265 res = self.context.runJob(self, takeUpToNumLeft, p, True)
1266
1267 items += res
/opt/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
878 # SparkContext#runJob.
879 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 880 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions,
881 allowLocal)
882 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
/opt/spark/python/pyspark/rdd.py in _jrdd(self)
2349 command = (self.func, profiler, self._prev_jrdd_deserializer,
2350 self._jrdd_deserializer)
-> 2351 pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
2352 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
2353 bytearray(pickled_cmd),
/opt/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command, obj)
2269 # the serialized command will be compressed by broadcast
2270 ser = CloudPickleSerializer()
-> 2271 pickled_command = ser.dumps(command)
2272 if len(pickled_command) > (1 << 20): # 1M
2273 # The broadcast will have same life cycle as created PythonRDD
/opt/spark/python/pyspark/serializers.py in dumps(self, obj)
425
426 def dumps(self, obj):
--> 427 return cloudpickle.dumps(obj, 2)
428
429
/opt/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol)
620
621 cp = CloudPickler(file,protocol)
--> 622 cp.dump(obj)
623
624 return file.getvalue()
/opt/spark/python/pyspark/cloudpickle.py in dump(self, obj)
105 self.inject_addons()
106 try:
--> 107 return Pickler.dump(self, obj)
108 except RuntimeError as e:
109 if 'recursion' in e.args[0]:
/usr/lib/python2.7/pickle.pyc in dump(self, obj)
222 if self.proto >= 2:
223 self.write(PROTO + chr(self.proto))
--> 224 self.save(obj)
225 self.write(STOP)
226
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
560 write(MARK)
561 for element in obj:
--> 562 save(element)
563
564 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
197 klass = getattr(themodule, name, None)
198 if klass is None or klass is not obj:
--> 199 self.save_function_tuple(obj)
200 return
201
/opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
631 write(MARK)
632 for x in tmp:
--> 633 save(x)
634 write(APPENDS)
635 elif n:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
197 klass = getattr(themodule, name, None)
198 if klass is None or klass is not obj:
--> 199 self.save_function_tuple(obj)
200 return
201
/opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
631 write(MARK)
632 for x in tmp:
--> 633 save(x)
634 write(APPENDS)
635 elif n:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
197 klass = getattr(themodule, name, None)
198 if klass is None or klass is not obj:
--> 199 self.save_function_tuple(obj)
200 return
201
/opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
634 write(APPENDS)
635 elif n:
--> 636 save(tmp[0])
637 write(APPEND)
638 # else tmp is empty, and we're done
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
192 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
--> 193 self.save_function_tuple(obj)
194 return
195 else:
/opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
634 write(APPENDS)
635 elif n:
--> 636 save(tmp[0])
637 write(APPEND)
638 # else tmp is empty, and we're done
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
192 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
--> 193 self.save_function_tuple(obj)
194 return
195 else:
/opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
239
240 # save the rest of the func data needed by _fill_function
--> 241 save(f_globals)
242 save(defaults)
243 save(dct)
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_dict(self, obj)
647
648 self.memoize(obj)
--> 649 self._batch_setitems(obj.iteritems())
650
651 dispatch[DictionaryType] = save_dict
/usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
684 k, v = tmp[0]
685 save(k)
--> 686 save(v)
687 write(SETITEM)
688 # else tmp is empty, and we're done
/usr/lib/python2.7/pickle.pyc in save(self, obj)
329
330 # Save the reduce() output and finally memoize the object
--> 331 self.save_reduce(obj=obj, *rv)
332
333 def persistent_id(self, obj):
/opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
516
517 if state is not None:
--> 518 save(state)
519 write(pickle.BUILD)
520
/usr/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/usr/lib/python2.7/pickle.pyc in save_dict(self, obj)
647
648 self.memoize(obj)
--> 649 self._batch_setitems(obj.iteritems())
650
651 dispatch[DictionaryType] = save_dict
/usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
679 for k, v in tmp:
680 save(k)
--> 681 save(v)
682 write(SETITEMS)
683 elif n:
/usr/lib/python2.7/pickle.pyc in save(self, obj)
304 reduce = getattr(obj, "__reduce_ex__", None)
305 if reduce:
--> 306 rv = reduce(self.proto)
307 else:
308 reduce = getattr(obj, "__reduce__", None)
/opt/spark/python/pyspark/context.py in __getnewargs__(self)
250 # This method is called when attempting to pickle SparkContext, which is always an error:
251 raise Exception(
--> 252 "It appears that you are attempting to reference SparkContext from a broadcast "
253 "variable, action, or transforamtion. SparkContext can only be used on the driver, "
254 "not in code that it run on workers. For more information, see SPARK-5063."
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment