Skip to content

Instantly share code, notes, and snippets.

@tsusanto
Last active June 19, 2020 05:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tsusanto/5fbea1f137a60a27c13ea36077eabca2 to your computer and use it in GitHub Desktop.
Save tsusanto/5fbea1f137a60a27c13ea36077eabca2 to your computer and use it in GitHub Desktop.
/**
* Created by tsusanto on 2/15/2017.
spark-submit --master yarn-client --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar
spark-submit --master yarn-cluster --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.kudu.spark.kudu._
import org.apache.spark.sql.functions._
object sparkXMLdemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("spark-xml-kudu")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val newData = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").option("samplingRatio","1").load("/user/tsusanto/POSLog-201409300635-21.xml")
val flattened = newData.withColumn("LineItem", explode($"RetailTransaction.LineItem"))
val selectedData = flattened.select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber" as "LineItemSequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.DiscountAmount", $"LineItem.Sale.ExtendedDiscountAmount", $"LineItem.Sale.Quantity")
val kuduContext = new KuduContext("your-kudu-master-server:7051")
val df = sqlContext.read.options(Map("kudu.master" -> "your-kudu-master-server:7051","kudu.table" -> "sales_lines_tenny")).kudu
selectedData.registerTempTable("selectedData")
val b = sqlContext.sql("SELECT CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string), '-', cast(LineItemSequenceNumber as string)), CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string)), RetailStoreID,WorkstationID,SequenceNumber,BusinessDayDate,OperatorName,LineItemSequenceNumber,POSIDType,cast(POSItemID as String),Description,TaxAmount,RegularSalesUnitPrice,ExtendedAmount,DiscountAmount,ExtendedDiscountAmount,Quantity FROM selectedData")
val c = b.selectExpr("_c0 as lineid", "_c1 as transactionid", "RetailStoreID as retailstoreid","WorkstationID as workstationid" ,"SequenceNumber as sequencenumber","BusinessDayDate as businessdaydate","OperatorName as operatorname", "LineItemSequenceNumber as lineitemsequencenumber","POSIDType as posidtype","POSItemID as positemid","Description as description","TaxAmount as taxamount","RegularSalesUnitPrice as regularsalesunitprice","ExtendedAmount as extendedamount","DiscountAmount as discountamount","ExtendedDiscountAmount as extendeddiscountamount","Quantity as quantity")
mykuduContext.upsertRows(c, "sales_lines_tenny")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment