Skip to content

Instantly share code, notes, and snippets.

@oluies
Created October 26, 2016 11:18
Show Gist options
  • Save oluies/632faeacfcc834aa79631ff9b4071039 to your computer and use it in GitHub Desktop.
Save oluies/632faeacfcc834aa79631ff9b4071039 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.types.{DoubleType,LongType,ShortType, IntegerType, StructField,TimestampType, StructType,StringType,NumericType,BooleanType}
import org.apache.hadoop.fs.{FileSystem,Path}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
def getschemametod(): StructType = {
StructType(
Seq(
StructField("dll",StringType,true),
StructField("class",StringType,true),
StructField("method",StringType,true),
StructField("""text""",StringType,true),
StructField("tobiv",ShortType,true)
)
)
}
def csvToParquet(file: Path, filetype:String, year:String, month:String) {
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatical¤ly infer data types
.option("delimiter",",")
.option("charset","ISO-8859-15")
.option("mode","DROPMALFORMED")
.option("treatEmptyValuesAsNulls","true")
.option("nullValue","NULL")
//.schema(getSchemauttag())
.load(file.toString() )
import org.apache.spark.sql.SaveMode
val parquetTarget = s"/data/adobe/confidential/p/raw/${filetype}/v1/country=se/year=${year}/month=${month}"
println(parquetTarget)
df.write.mode(SaveMode.Overwrite).parquet(parquetTarget)
}
def csvToDF(file: Path, delimiter : String,charset: String = "UTF8", useHeader: Boolean = true, schema: Option[StructType] = None) = {
val df = schema match {
case Some(schema) => sqlContext.read
.format("com.databricks.spark.csv")
.option("header",useHeader.toString()) // Use first line of all files as header
.option("inferSchema", "true") // Automatical¤ly infer data types
.option("delimiter",delimiter)
.option("charset",charset)
.option("mode","DROPMALFORMED")
.option("treatEmptyValuesAsNulls","true")
.option("nullValue","NULL")
.schema(schema)
.load(file.toString() )
case None => sqlContext.read
.format("com.databricks.spark.csv")
.option("header",useHeader.toString()) // Use first line of all files as header
.option("inferSchema", "true") // Automatical¤ly infer data types
.option("delimiter",delimiter)
.option("charset",charset)
.option("mode","DROPMALFORMED")
.option("treatEmptyValuesAsNulls","true")
.option("nullValue","NULL")
.load(file.toString() )
}
df
}
def processFile(file: Path) = {
val monthtunum = Map("Jan" -> "01", "Feb" -> "02", "Mar" -> "03","Apr" -> "04", "May" -> "05","Maj" -> "05", "Jun" -> "06",
"Jul" -> "07", "Aug" -> "08", "Sep" -> "09", "Oct" -> "10","Nov" -> "11", "Dec" -> "12")
val filename = file.getName()
println(filename)
// /Apr16_uttag.csv
val name = filename.split('.')(0)
val splitname = name.split('_')
val date = splitname(0)
val year = "20"+date.substring(3,5)
val month = date.substring(0,3)
println (name);
println(year)
println(monthtunum(month))
println(file + " " + " " + year + " " + month)
// csvToParquet(file,"uttag",year,monthtunum(month))
}
def getschedata(): StructType = {
StructType(
Seq(
StructField("id",StringType,true),
StructField("starttid",TimestampType,true),
StructField("felkod",StringType,true),
StructField("dll",StringType,true),
StructField("klass",StringType,true),
StructField("metod",StringType,true),
StructField("indata",StringType,true),
StructField("KlientIP",StringType,true),
StructField("TjansteAnrop",StringType,true)
)
)
}
val filedata = new Path("/data/loggik/confidential/db*.csv")
val mobilefiledata = new Path("/data/loggik/confidential/mobile/db*.csv")
val dfdata = csvToDF(filedata,",",useHeader = true,schema=Option(getschedata()))
val dfmobiledata = csvToDF(mobilefiledata,",",useHeader = true,schema=Option(getschedata()))
dfdata .registerTempTable("dfdata")
dfmobiledata.registerTempTable("dfmobiledata")
val sqltext = """SELECT YEAR(starttid) as year,
MONTH(starttid) AS month,
trim(id) AS id,
starttid,
trim(felkod) AS felkod,
trim(dll) AS dll,
trim(klass) as klass,
trim(metod) as metod,
trim(indata) as indata,
trim(KlientIP) as klientip,
trim(TjansteAnrop) as tjansteAnrop
FROM dfdata"""
val sqltextdfmobiledata = """SELECT YEAR(starttid) as year,
MONTH(starttid) AS month,
trim(id) AS id,
starttid,
trim(felkod) AS felkod,
trim(dll) AS dll,
trim(klass) as klass,
trim(metod) as metod,
trim(indata) as indata,
trim(KlientIP) as klientip,
trim(TjansteAnrop) as tjansteAnrop
FROM dfmobiledata"""
val dfsaldata = sqlContext.sql(sqltext )
val dfsaldatamobile = sqlContext.sql(sqltextdfmobiledata )
val parquetTarget = s"/data/loggik/confidential/p/db/v1"
val parquetTargetmobile = s"/data/loggik/confidential/mobile/p/db/v1"
import org.apache.spark.sql.SaveMode
dfsaldata.write.partitionBy("year","month").mode(SaveMode.Append).parquet(parquetTarget)
dfsaldatamobile.write.partitionBy("year","month").mode(SaveMode.Append).parquet(parquetTargetmobile )
spark-shell --master yarn-client --deploy-mode client --driver-cores 4 --num-executors 8 --conf "spark.driver.extraJavaOptions=-Dhttp.proxyHost=gias.sebank.se -Dhttp.proxyPort=8080 -Dhttps.proxyHost=gias.sebank.se -Dhttps.proxyPort=8080" --packages com.databricks:spark-csv_2.10:1.4.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment