Skip to content

Instantly share code, notes, and snippets.

tier1.sources = filename-source1
tier1.channels = channel1
tier1.sinks = hdfs-sink1
tier1.channels.channel1.type = file
tier1.channels.channel1.checkpointDir = /mnt/data/tenny/flume/checkpoint
tier1.channels.channel1.dataDirs = /mnt/data/tenny/flume/data
tier1.channels.channel1.capacity = 1000000
tier1.channels.channel1.transactionCapacity = 10000
#!/bin/bash
##Tenny Susanto
##2017-11-09
##download messages from Kafka partition by partition
##do not use this in a Kakfa queue that is constantly receiving new messages
##this should only be used for manual pull for historical data done by platform team (they dump the data into Kafka once and don't write to it)
##This file contains the login/password to the Kafka server
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/jass.conf"
//Tenny Susanto
//restart hue that are in bad health
//send alerts
var http = require('http');
var email = require("emailjs");
var querystring = require('querystring')
var server = email.server.connect({
host: "relay01.sc5.coupons.lan"
name := "Spark2Kafka"
version := "1.0"
scalaVersion := "2.11.0"
resolvers ++= Seq(
"cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
)
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* export SPARK_KAFKA_VERSION=0.10
CREATE TABLE sales_lines_tenny
(
LineID STRING,
TransactionID STRING,
RetailStoreID BIGINT,
WorkstationID BIGINT,
SequenceNumber BIGINT,
BusinessDayDate STRING,
OperatorName STRING,
LineItemSequenceNumber BIGINT,
/**
* Created by tsusanto on 2/15/2017.
spark-submit --master yarn-client --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar
spark-submit --master yarn-cluster --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.kudu.spark.kudu._
import org.apache.spark.sql.functions._
name := "SparkXmlKudu"
version := "1.0"
scalaVersion := "2.10.5"
resolvers ++= Seq(
"cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
)
<POSLog
xmlns:poslog="http://www.nrf-arts.org/IXRetail/namespace/"
xmlns="http://www.nrf-arts.org/IXRetail/namespace/"
xmlns:acs="http://www.ncr.com/rsd/acs/tlog/markup/poslog/2.1"
xmlns:raw="http://www.ncr.com/rsd/acs/tlog/markup/raw/base/6.1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:msxsl="urn:schemas-microsoft-com:xslt"
xmlns:as="urn:ACSScript"
xmlns:acssm="urn:ACSSigMap">
<Transaction>
@tsusanto
tsusanto / parsexml
Last active September 26, 2017 12:49
spark-shell --packages com.databricks:spark-xml_2.10:0.4.1
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").load("/user/tsusanto/POSLog-201409300635-21.xml")
val flattened = df.withColumn("LineItem", explode($"RetailTransaction.LineItem"))
val selectedData = flattened.filter($"SequenceNumber" === "73" ).select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.D