Skip to content

Instantly share code, notes, and snippets.

@lxneng
Forked from MallikarjunaG/StreamCatsToHBase.py
Created August 20, 2020 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lxneng/6a285407e49dece7e79aec9d9ee63dcc to your computer and use it in GitHub Desktop.
Save lxneng/6a285407e49dece7e79aec9d9ee63dcc to your computer and use it in GitHub Desktop.
PySpark HBase and Spark Streaming: Save RDDs to HBase - http://cjcroix.blogspot.in/
1: import sys
2: import json
3: from pyspark import SparkContext
4: from pyspark.streaming import StreamingContext
5:
6:
7: def SaveRecord(rdd):
8: host = 'sparkmaster.example.com'
9: table = 'cats'
10: keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
11: valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
12: conf = {"hbase.zookeeper.quorum": host,
13: "hbase.mapred.outputtable": table,
14: "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
15: "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
16: "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
17: datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))
18: datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
19:
20: if __name__ == "__main__":
21: if len(sys.argv) != 3:
22: print("Usage: StreamCatsToHBase.py <hostname> <port>")
23: exit(-1)
24:
25: sc = SparkContext(appName="StreamCatsToHBase")
26: ssc = StreamingContext(sc, 1)
27: lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
28: lines.foreachRDD(SaveRecord)
29:
30: ssc.start() # Start the computation
31: ssc.awaitTermination() # Wait for the computation to terminate
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment