This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input { | |
file { | |
path => ["/opt/gen_logs/logs/access.log"] | |
type => "apache_access" | |
} | |
} | |
filter { | |
grok { | |
match => [ | |
"message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}", |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/opt/mapr/spark/spark-2.2.1/bin/spark-submit \ | |
--class CountryVisitCount \ | |
--master yarn \ | |
--conf spark.ui.port=4926 \ | |
--jars $(echo /external_jars/*.jar | tr ' ' ',') \ | |
kafkaworkshopmapr_2.11-0.1.jar prod |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "KafkaWorkshopMapr" | |
version := "0.1" | |
scalaVersion := "2.11.11" | |
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.1" | |
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.2.1" | |
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.1" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.{Properties} | |
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} | |
import scala.sys.process._ | |
val props = new Properties() | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "rm01.itversity.com:6667,nn02.itversity.com:6667,nn01.itversity.com:6667") | |
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ScalaProducerExample") | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") | |
val producer = new KafkaProducer[String, String](props) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.{Collections, Properties} | |
import com.typesafe.config.ConfigFactory | |
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} | |
import scala.collection.JavaConversions._ | |
object KafkaConsumerExample { | |
def main(args: Array[String]): Unit = { | |
val conf = ConfigFactory.load |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Properties | |
import com.typesafe.config.ConfigFactory | |
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} | |
object KafkaProducerExample { | |
def main(args: Array[String]): Unit = { | |
val conf = ConfigFactory.load | |
val envProps = conf.getConfig(args(0)) | |
val props = new Properties() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools as it, sys | |
from functools import reduce | |
order_items = open("/data/retail_db/order_items/part-00000","r").read().splitlines()[:50] | |
order_subtotal = list(map(lambda x: (int(x.split(",")[1]),float(x.split(",")[4])),order_items)) | |
#sorted values should be passed in groupby | |
iterator = it.groupby(sorted(order_subtotal),lambda x: x[0]) | |
revenuePerOrder = map(lambda x: (x[0], reduce(lambda a,b: a + b, map(lambda v: v[1], x[1]))), iterator) | |
print(list(revenuePerOrder)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import reduce | |
def getTotalById(order_items,id): | |
filtered_list = filter(lambda x: int(x.split(",")[1]) == id,order_items) | |
subtotal = map(lambda x: float(x.split(",")[4]), filtered_list) | |
return reduce(lambda x,y: x + y,subtotal) | |
order_items = open("/data/retail_db/order_items/part-00000","r").read().splitlines() | |
getTotalById(order_items[:20],5) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def getOrderItemTuples(order_items): | |
return list(map(lambda x: (int(x.split(",")[1]),float(x.split(",")[4])),order_items)) | |
order_items = open("/data/retail_db/order_items/part-00000","r").read().splitlines() | |
getOrderItemTuples(order_items[:20]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def sumOfInt(n,fun): | |
sum = 0 | |
for i in range(0,n+1): | |
sum = sum + fun(i) | |
return sum | |
sumOfInt(5,lambda x: x*x) |