Skip to content

Instantly share code, notes, and snippets.

//get free account here: https://mongolab.com/plans/
//free plan includes 0.5GB of storage
//refer to http://docs.mongodb.org/manual/applications/crud/
//To list all collections you currently have
db.getCollectionNames();
//To drop a collection
db.testdemo.drop();
Below is my Flume config file to push files dropped in folder to HDFS
The files are usually about 2MB in size.
The default property deserializer.maxLineLength is set to 2048. Which means after 2048 bytes of data,
flume truncates the data and treats it as a new event. Thus the resulting file in HDFS had a lot of newlines.
I changed it to 4096000, which is about 4MB
#Flume config file
tier1.sources = xml-source1
<POSLog>
<Transaction>
<RetailStoreID>48</RetailStoreID>
<WorkstationID>6</WorkstationID>
<SequenceNumber>73</SequenceNumber>
<BusinessDayDate>2014-09-30</BusinessDayDate>
<EndDateTime>2014-09-30T06:20:14</EndDateTime>
<OperatorID OperatorName="KERRY P">48237</OperatorID>
<CurrencyCode>USD</CurrencyCode>
<RetailTransaction Version="2.1">
<POSLog
xmlns:poslog="http://www.nrf-arts.org/IXRetail/namespace/"
xmlns="http://www.nrf-arts.org/IXRetail/namespace/"
xmlns:acs="http://www.ncr.com/rsd/acs/tlog/markup/poslog/2.1"
xmlns:raw="http://www.ncr.com/rsd/acs/tlog/markup/raw/base/6.1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:msxsl="urn:schemas-microsoft-com:xslt"
xmlns:as="urn:ACSScript"
xmlns:acssm="urn:ACSSigMap">
<Transaction>
name := "SparkXmlKudu"
version := "1.0"
scalaVersion := "2.10.5"
resolvers ++= Seq(
"cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
)
CREATE TABLE sales_lines_tenny
(
LineID STRING,
TransactionID STRING,
RetailStoreID BIGINT,
WorkstationID BIGINT,
SequenceNumber BIGINT,
BusinessDayDate STRING,
OperatorName STRING,
LineItemSequenceNumber BIGINT,
//Tenny Susanto
//restart hue that are in bad health
//send alerts
var http = require('http');
var email = require("emailjs");
var querystring = require('querystring')
var server = email.server.connect({
host: "relay01.sc5.coupons.lan"
@tsusanto
tsusanto / parsexml
Last active September 26, 2017 12:49
spark-shell --packages com.databricks:spark-xml_2.10:0.4.1
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").load("/user/tsusanto/POSLog-201409300635-21.xml")
val flattened = df.withColumn("LineItem", explode($"RetailTransaction.LineItem"))
val selectedData = flattened.filter($"SequenceNumber" === "73" ).select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.D
tier1.sources = filename-source1
tier1.channels = channel1
tier1.sinks = hdfs-sink1
tier1.channels.channel1.type = file
tier1.channels.channel1.checkpointDir = /mnt/data/tenny/flume/checkpoint
tier1.channels.channel1.dataDirs = /mnt/data/tenny/flume/data
tier1.channels.channel1.capacity = 1000000
tier1.channels.channel1.transactionCapacity = 10000
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* export SPARK_KAFKA_VERSION=0.10