Skip to content

Instantly share code, notes, and snippets.

@sambos
Last active May 3, 2018 15:49
Show Gist options
  • Save sambos/803cfb7137e880eccccefc87bc3b4e5d to your computer and use it in GitHub Desktop.
Save sambos/803cfb7137e880eccccefc87bc3b4e5d to your computer and use it in GitHub Desktop.
Spark Hadoop utils
val rdd = loadAvroData(sc,logPath,suffix).map(x => parseKV(x._2, kvPattern_quote).toMap)
convertToDF(sc,rdd)
def convertToDF(sc: SparkContext, rdd:RDD[Map[String,String]]): DataFrame = {
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val fields = Array("name","emp","dept","nick")
val schema = StructType(fields.map { x => StructField(x, StringType) })
val result = rdd.map(x => (x.getOrElse(fields(0), ""),x.getOrElse(fields(1), ""),x.getOrElse(fields(2), ""),x.getOrElse(fields(3), "")))
val df = result.toDF(fields:_*)
df
}
def loadAvroData(sc:SparkContext, logPath:String, suffix:String): RDD[(String,String)] = {
import com.databricks.spark.avro._
val sqlContext = new SQLContext(sc)
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate")
sqlContext.setConf("spark.sql.avro.deflate.level", "9")
val rdd = sqlContext.read.format("com.databricks.spark.avro").load(logPath).select("body")
.map{ x => new String(x.getAs[Array[Byte]]("body"))}
.map(x => parseAsKV(x)).filter(x => x._1 != null).filter(x => !x._1.contains(" "))
rdd
}
def saveToAvro(sc: SparkContext, jsonRdd:RDD[String], avroPath:String) = {
import com.databricks.spark.avro._
val sqlContext = new SQLContext(sc)
//sqlContext.setConf("spark.sql.avro.compression.codec", "snappy")
//sqlContext.setConf("spark.sql.avro.snappy.level", "5")
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate")
sqlContext.setConf("spark.sql.avro.deflate.level", "9")
val schema = StructType(Array(StructField("body", BinaryType, false)))
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val df = sqlContext.createDataFrame(jsonRdd.map{x => Row.fromSeq(Seq(x.getBytes))},schema)
df.printSchema()
df.write.avro(avroPath)
}
def readAvro = {
var path = "C://dev//sam//avro1//*.avro"
def b2s(a: Array[Byte]): String = new String(a)
val sc: SparkContext = new SparkContext(
new SparkConf().setMaster("local[1]").setAppName("app"))
import com.databricks.spark.avro._
val sqlContext = new SQLContext(sc)
//sqlContext.setConf("spark.sql.avro.compression.codec", "snappy")
//sqlContext.setConf("spark.sql.avro.snappy.level", "5")
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate")
sqlContext.setConf("spark.sql.avro.deflate.level", "9")
val df = sqlContext.read.avro(path)
df.printSchema()
val lines = df.select("body").map { x => b2s(x.getAs[Array[Byte]]("body"))}
lines.take(2).foreach(x => println(x))
sc.stop()
}
def clean(path:String):Unit = {
val hadoopConf = new Configuration()
val uri = FileSystem.getDefaultUri(hadoopConf)
log.info("Default URI:" + uri)
val hdfs = FileSystem.get(uri, hadoopConf)
try {
log.info(s"${path} Exists: ${hdfs.exists(new Path(path))}")
log.info("deleted : " + hdfs.delete(new Path(path), true))
log.info(s"${path} Exists: ${hdfs.exists(new Path(path))}")
} catch { case e: Throwable => log.error("failed to delete " + path, e) }
finally{ hdfs.close}
}
def getPartitions(filepath:String):Long = {
val hadoopConf = new Configuration()
val uri = FileSystem.getDefaultUri(hadoopConf)
val hdfs = FileSystem.get(uri, hadoopConf)
val path = new Path(filepath.replaceAll("\\*", ""))
if(!hdfs.isDirectory(path) || !hdfs.exists(path))
return -1;
val blockSize = hdfs.getDefaultBlockSize(path)
val fileSize = hdfs.getContentSummary(path).getLength
log.info("DefaultBlockSize=" + blockSize )
log.info("File Size=" + fileSize )
hdfs.close
val count = fileSize/blockSize
if(count < 1) 1 else count * 3
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment