Skip to content

Instantly share code, notes, and snippets.

#!/bin/bash
##Tenny Susanto
##2017-11-09
##download messages from Kafka partition by partition
##do not use this in a Kakfa queue that is constantly receiving new messages
##this should only be used for manual pull for historical data done by platform team (they dump the data into Kafka once and don't write to it)
##This file contains the login/password to the Kafka server
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/jass.conf"
/**
* Created by tsusanto on 2/15/2017.
spark-submit --master yarn-client --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar
spark-submit --master yarn-cluster --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.kudu.spark.kudu._
import org.apache.spark.sql.functions._
<POSLog>
<Transaction>
<RetailStoreID>48</RetailStoreID>
<WorkstationID>6</WorkstationID>
<SequenceNumber>73</SequenceNumber>
<BusinessDayDate>2014-09-30</BusinessDayDate>
<EndDateTime>2014-09-30T06:20:14</EndDateTime>
<OperatorID OperatorName="KERRY P">48237</OperatorID>
<CurrencyCode>USD</CurrencyCode>
<RetailTransaction Version="2.1">
name := "Spark2Kafka"
version := "1.0"
scalaVersion := "2.11.0"
resolvers ++= Seq(
"cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
)
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* export SPARK_KAFKA_VERSION=0.10
tier1.sources = filename-source1
tier1.channels = channel1
tier1.sinks = hdfs-sink1
tier1.channels.channel1.type = file
tier1.channels.channel1.checkpointDir = /mnt/data/tenny/flume/checkpoint
tier1.channels.channel1.dataDirs = /mnt/data/tenny/flume/data
tier1.channels.channel1.capacity = 1000000
tier1.channels.channel1.transactionCapacity = 10000
@tsusanto
tsusanto / parsexml
Last active September 26, 2017 12:49
spark-shell --packages com.databricks:spark-xml_2.10:0.4.1
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").load("/user/tsusanto/POSLog-201409300635-21.xml")
val flattened = df.withColumn("LineItem", explode($"RetailTransaction.LineItem"))
val selectedData = flattened.filter($"SequenceNumber" === "73" ).select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.D
//Tenny Susanto
//restart hue that are in bad health
//send alerts
var http = require('http');
var email = require("emailjs");
var querystring = require('querystring')
var server = email.server.connect({
host: "relay01.sc5.coupons.lan"
CREATE TABLE sales_lines_tenny
(
LineID STRING,
TransactionID STRING,
RetailStoreID BIGINT,
WorkstationID BIGINT,
SequenceNumber BIGINT,
BusinessDayDate STRING,
OperatorName STRING,
LineItemSequenceNumber BIGINT,
name := "SparkXmlKudu"
version := "1.0"
scalaVersion := "2.10.5"
resolvers ++= Seq(
"cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
)