Skip to content

Instantly share code, notes, and snippets.

@MallikarjunaG
Created April 28, 2017 12:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save MallikarjunaG/7a03e7c0103fc8cfba42f58f3ffc24bf to your computer and use it in GitHub Desktop.
Save MallikarjunaG/7a03e7c0103fc8cfba42f58f3ffc24bf to your computer and use it in GitHub Desktop.
PySpark HBase and Spark Streaming: Save RDDs to HBase - http://cjcroix.blogspot.in/
1: import sys
2: import json
3: from pyspark import SparkContext
4: from pyspark.streaming import StreamingContext
5:
6:
7: def SaveRecord(rdd):
8: host = 'sparkmaster.example.com'
9: table = 'cats'
10: keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
11: valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
12: conf = {"hbase.zookeeper.quorum": host,
13: "hbase.mapred.outputtable": table,
14: "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
15: "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
16: "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
17: datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))
18: datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
19:
20: if __name__ == "__main__":
21: if len(sys.argv) != 3:
22: print("Usage: StreamCatsToHBase.py <hostname> <port>")
23: exit(-1)
24:
25: sc = SparkContext(appName="StreamCatsToHBase")
26: ssc = StreamingContext(sc, 1)
27: lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
28: lines.foreachRDD(SaveRecord)
29:
30: ssc.start() # Start the computation
31: ssc.awaitTermination() # Wait for the computation to terminate
@srinvias
Copy link

srinvias commented May 5, 2018

Hi Mallikarjuna ,

I tried in the same way . i am getting below error . could you please suggest me here . thanks .

datamap.saveAsNewAPIHadoopDataset(conf,keyConverter=keyConv,valueConverter=valueConv)
Traceback (most recent call last):
File "", line 1, in
File "/usr/lib/spark/python/pyspark/rdd.py", line 1408, in saveAsNewAPIHadoopDataset
keyConverter, valueConverter, True)
File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in call
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset.
: java.lang.NullPointerException
at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:123)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)
at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:387)
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
at org.apache.spark.api.python.PythonRDD$.saveAsHadoopDataset(PythonRDD.scala:556)
at org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment