Skip to content

Instantly share code, notes, and snippets.

//get free account here: https://mongolab.com/plans/
//free plan includes 0.5GB of storage
//refer to http://docs.mongodb.org/manual/applications/crud/
//To list all collections you currently have
db.getCollectionNames();
//To drop a collection
db.testdemo.drop();
Below is my Flume config file to push files dropped in folder to HDFS
The files are usually about 2MB in size.
The default property deserializer.maxLineLength is set to 2048. Which means after 2048 bytes of data,
flume truncates the data and treats it as a new event. Thus the resulting file in HDFS had a lot of newlines.
I changed it to 4096000, which is about 4MB
#Flume config file
tier1.sources = xml-source1
<POSLog>
<Transaction>
<RetailStoreID>48</RetailStoreID>
<WorkstationID>6</WorkstationID>
<SequenceNumber>73</SequenceNumber>
<BusinessDayDate>2014-09-30</BusinessDayDate>
<EndDateTime>2014-09-30T06:20:14</EndDateTime>
<OperatorID OperatorName="KERRY P">48237</OperatorID>
<CurrencyCode>USD</CurrencyCode>
<RetailTransaction Version="2.1">
<POSLog>
<Transaction>
<RetailStoreID>48</RetailStoreID>
<WorkstationID>6</WorkstationID>
<SequenceNumber>73</SequenceNumber>
<BusinessDayDate>2014-09-30</BusinessDayDate>
<EndDateTime>2014-09-30T06:20:14</EndDateTime>
<OperatorID OperatorName="KERRY P">48237</OperatorID>
<CurrencyCode>USD</CurrencyCode>
<RetailTransaction Version="2.1">
@tsusanto
tsusanto / parsexml
Last active September 26, 2017 12:49
spark-shell --packages com.databricks:spark-xml_2.10:0.4.1
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").load("/user/tsusanto/POSLog-201409300635-21.xml")
val flattened = df.withColumn("LineItem", explode($"RetailTransaction.LineItem"))
val selectedData = flattened.filter($"SequenceNumber" === "73" ).select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.D
<POSLog
xmlns:poslog="http://www.nrf-arts.org/IXRetail/namespace/"
xmlns="http://www.nrf-arts.org/IXRetail/namespace/"
xmlns:acs="http://www.ncr.com/rsd/acs/tlog/markup/poslog/2.1"
xmlns:raw="http://www.ncr.com/rsd/acs/tlog/markup/raw/base/6.1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:msxsl="urn:schemas-microsoft-com:xslt"
xmlns:as="urn:ACSScript"
xmlns:acssm="urn:ACSSigMap">
<Transaction>
name := "SparkXmlKudu"
version := "1.0"
scalaVersion := "2.10.5"
resolvers ++= Seq(
"cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
)
/**
* Created by tsusanto on 2/15/2017.
spark-submit --master yarn-client --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar
spark-submit --master yarn-cluster --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.kudu.spark.kudu._
import org.apache.spark.sql.functions._
CREATE TABLE sales_lines_tenny
(
LineID STRING,
TransactionID STRING,
RetailStoreID BIGINT,
WorkstationID BIGINT,
SequenceNumber BIGINT,
BusinessDayDate STRING,
OperatorName STRING,
LineItemSequenceNumber BIGINT,
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* export SPARK_KAFKA_VERSION=0.10