This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark-submit --master yarn \ | |
--conf spark.ui.port=12890 \ | |
--jars "/usr/hdp/2.5.0.0-1245/kafka/libs/spark-streaming-kafka_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/kafka/libs/kafka_2.10-0.8.2.1.jar,/usr/hdp/2.5.0.0-1245/kafka/libs/metrics-core-2.2.0.jar" \ | |
src/main/python/StreamingKafkaDepartmentCount.py \ | |
/user/dgadiraju/streamingkafkadepartmentcount/cnt |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkConf, SparkContext | |
from pyspark.streaming import StreamingContext | |
from pyspark.streaming.kafka import KafkaUtils | |
import sys | |
conf = SparkConf(). \ | |
setAppName("Streaming Department Count"). \ | |
setMaster("yarn-client") | |
sc = SparkContext(conf=conf) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# wskafka.conf: A single-node Flume configuration | |
# to read data from webserver logs and publish | |
# to kafka topic | |
# Name the components on this agent | |
wk.sources = ws | |
wk.sinks = kafka | |
wk.channels = mem | |
# Describe/configure the source |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark-submit --master yarn \ | |
--conf spark.ui.port=12890 \ | |
--jars "/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming-flume_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming-flume-sink_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/flume/lib/flume-ng-sdk-1.5.2.2.5.0.0-1245.jar" \ | |
src/main/python/StreamingFlumeDepartmentCount.py \ | |
gw01.itversity.com 8123 /user/dgadiraju/streamingflumedepartmentcount/cnt |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
daily_revenue_per_product_df.show(100) | |
daily_revenue_per_product_df.save("/user/dgadiraju/daily_revenue_save", "json") | |
daily_revenue_per_product_df.write.json("/user/dgadiraju/daily_revenue_write") | |
daily_revenue_per_product_df.select("order_date", "daily_revenue_per_product").show() | |
daily_revenue_per_product_df.filter(daily_revenue_per_product_df["order_date"] == "2013-07-26 00:00:00.0").show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sqlContext.sql("CREATE DATABASE dgadiraju_daily_revenue"); | |
sqlContext.sql("CREATE TABLE dgadiraju_daily_revenue.daily_revenue (order_date string, product_name string, daily_revenue_per_product float) STORED AS orc") | |
daily_revenue_per_product_df = sqlContext.sql("SELECT o.order_date, p.product_name, sum(oi.order_item_subtotal) daily_revenue_per_product \ | |
FROM orders o JOIN order_items oi \ | |
ON o.order_id = oi.order_item_order_id \ | |
JOIN products p \ | |
ON p.product_id = oi.order_item_product_id \ | |
WHERE o.order_status IN ('COMPLETE', 'CLOSED') \ | |
GROUP BY o.order_date, p.product_name \ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sqlContext.sql("use dgadiraju_retail_db_txt") | |
from pyspark.sql import Row | |
productsRaw = open("/data/retail_db/products/part-00000").read().splitlines() | |
productsRDD = sc.parallelize(productsRaw) | |
productsDF = productsRDD.\ | |
map(lambda p: Row(product_id=int(p.split(",")[0]), product_name=p.split(",")[2])).\ | |
toDF() | |
productsDF.registerTempTable("products") | |
sqlContext.sql("select * from products").show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import Row | |
ordersRDD = sc.textFile("/public/retail_db/orders") | |
ordersDF = ordersRDD.\ | |
map(lambda o: Row(order_id=int(o.split(",")[0]), order_date=o.split(",")[1], order_customer_id=int(o.split(",")[2]), order_status=o.split(",")[3])).toDF() | |
ordersDF.registerTempTable("ordersDF_table") | |
sqlContext.sql("select order_status, count(1) from ordersDF_table group by order_status").show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
select * from ( | |
select o.order_id, o.order_date, o.order_status, oi.order_item_subtotal, | |
round(sum(oi.order_item_subtotal) over (partition by o.order_id), 2) order_revenue, | |
oi.order_item_subtotal/round(sum(oi.order_item_subtotal) over (partition by o.order_id), 2) pct_revenue, | |
round(avg(oi.order_item_subtotal) over (partition by o.order_id), 2) avg_revenue | |
from orders o join order_items oi | |
on o.order_id = oi.order_item_order_id | |
where o.order_status in ('COMPLETE', 'CLOSED')) q | |
where order_revenue >= 1000 | |
order by order_date, order_revenue desc, rank_revenue; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
select o.order_id, o.order_date, o.order_status, round(sum(oi.order_item_subtotal), 2) order_revenue | |
from orders o join order_items oi | |
on o.order_id = oi.order_item_order_id | |
where o.order_status in ('COMPLETE', 'CLOSED') | |
group by o.order_id, o.order_date, o.order_status | |
having sum(oi.order_item_subtotal) >= 1000 | |
order by o.order_date, order_revenue desc; | |
select o.order_id, o.order_date, o.order_status, round(sum(oi.order_item_subtotal), 2) order_revenue | |
from orders o join order_items oi |
NewerOlder