Skip to content

Instantly share code, notes, and snippets.

@sambos
Last active May 11, 2018 16:54
Show Gist options
  • Save sambos/2471d64ab2f34712feb3605c69cd58b5 to your computer and use it in GitHub Desktop.
Save sambos/2471d64ab2f34712feb3605c69cd58b5 to your computer and use it in GitHub Desktop.
parquet handling

showParquet.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SaveMode

sc.getConf.set("spark.sql.parquet.mergeSchema", "false")

      val file = "/user/name/event.parquet"
      val parquetFile = sqlContext.read.option("mergeSchema", "true").parquet("event.parquet")
      parquetFile.registerTempTable("parquetFile")
      //val ids = sqlContext.sql("SELECT * FROM parquetFile where ID='9ff10d20fdff' AND SERVER_IP='0.0.0.0'" )
      val ids = sqlContext.sql("SELECT * FROM parquetFile")
      ids.foreach { x => println(x.getAs[String]("ID") + "  " + x.getAs[String]("SERVER_IP") )}
      println(ids.count)
      ids.printSchema()
      sc.stop

Convert Avro to Parquet data - avroToParquet.scala

spark-shell -i avroToParquet.scala --conf spark.driver.extraJavaOptions="-Ddataset=$1"

avroToParquet.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import scala.collection.mutable.ArrayBuffer
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import org.apache.spark.sql.SaveMode

import com.fasterxml.jackson._
import scala.util.matching.Regex
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

  def matchAndExtractHost(line: String): String = {
      val pattern = """^\S+ \S+ \w+ \d+ \d+:\d+:\d+ (\S+) """

      if(line == null)
         return null
      val regex = pattern.r
      val matched = (regex findFirstMatchIn line)
      matched match {
        case Some(regex(field)) => field
        case None               => null
      }
  }
    def parseKV(line:String, pattern:String): List[(String,String)] = {
      val regex = pattern.r.findAllMatchIn(line)
      var list = List[(String,String)]()
      regex.map { x => (x.group(1), x.group(2)) }
           .foreach { case (k,v) => list = (k, v) :: list }

        //add host
        list = ("host",matchAndExtractHost(line)) :: list

      list
    }
    var dataset = System.getProperty("dataset")
    println("================> dataset requested : " + dataset)
    if(dataset == null) dataset = "default"

    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

    // this is used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._
    import com.databricks.spark.avro._

   sqlContext.setConf("spark.sql.avro.compression.codec", "snappy")
   sqlContext.setConf("spark.sql.avro.snappy.level", "5")
   sqlContext.sql("set spark.sql.caseSensitive=true")

   var avro_file = "/data/user/avro/compressed-avro.avro"

    val df2 = sqlContext.read.avro(avro_file)
    df2.printSchema()

    //df2.select("body").toJSON.take(2).foreach { x => println(x) }
    //df2.select("body").take(2).foreach { x => println(convertToString(x.getAs("body")).mkString) }
    var lines = df2.select("body").map { x => new String(x.getAs[Array[Byte]]("body"))}
    //lines.take(3).foreach { x => println(x)}
    //lines = sc.parallelize(lines.take(10))

    val kvPattern_quote  = """(\w+)="([^"]*)"[,\s+]"""
    val kv = lines.map(x => parseKV(x, kvPattern_quote).toMap )

   val jkv = kv.map(x => compact(render(x)))
   
    var eventlog = sqlContext.read.json(jkv)
    //eventlog = eventlog.withColumnRenamed("start", "start2");

    eventlog.printSchema()
    eventlog.write.mode(SaveMode.Append).option("mergeSchema", "false").parquet("/data/user/parquet/" + dataset + ".parquet")

    sc.stop
    System.exit(1)
                        
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment