Skip to content

Instantly share code, notes, and snippets.

@amir-rahnama
Last active November 23, 2015 11:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amir-rahnama/6c05ea21ba9fca208c35 to your computer and use it in GitHub Desktop.
Save amir-rahnama/6c05ea21ba9fca208c35 to your computer and use it in GitHub Desktop.
Send Result of MapReduce in Apache Spark (PySpark) over to a web socket: http://blog.ambodi.com/web-socket/
# pip install websocket-client
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from websocket import create_connection
def take_rdd_send_to_socket(time, rdd, num=1000):
result = []
taken = rdd.take(num + 1)
print("-------------------------------------------")
print("Time: %s" % time)
print("-------------------------------------------")
for record in taken[:num]:
print(record)
result.append(record)
# NOTE: If you don't create a connection for each RDD, you will end up with serialization error that I put separately in the other file
ws = create_connection(url)
ws.send(json.dumps(result))
ws.close()
if len(taken) > num:
print("...")
print("")
dstream.foreachRDD(take_rdd_send_to_socket)
15/11/21 14:18:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/21 14:18:48 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
Traceback (most recent call last):
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py", line 95, in dumps
return bytearray(self.serializer.dumps((func.func, func.deserializers)))
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps
return cloudpickle.dumps(obj, 2)
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 646, in dumps
cp.dump(obj)
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 107, in dump
return Pickler.dump(self, obj)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 193, in save_function
self.save_function_tuple(obj)
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 241, in save_function_tuple
save(f_globals)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 542, in save_reduce
save(state)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 542, in save_reduce
save(state)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 681, in _batch_setitems
save(v)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 315, in save_builtin_function
return self.save_function(obj)
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 191, in save_function
if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
15/11/21 14:18:49 ERROR StreamingContext: Error starting the context, marking it as stopped
java.io.IOException: java.lang.NullPointerException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:130)
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:130)
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:130)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:131)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:563)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:602)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:601)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:623)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
... 61 more
Traceback (most recent call last):
File "/Users/ara/dev/iteam/data-mining-worker/spark/map.py", line 45, in <module>
ssc.start()
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py", line 237, in start
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o20.start.
: java.io.IOException: java.lang.NullPointerException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:130)
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:130)
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:130)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:131)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:563)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:602)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:601)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:623)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
... 61 more
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment