Skip to content

Instantly share code, notes, and snippets.

spark-submit --master yarn \
--conf spark.ui.port=12890 \
--jars "/usr/hdp/2.5.0.0-1245/kafka/libs/spark-streaming-kafka_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/kafka/libs/kafka_2.10-0.8.2.1.jar,/usr/hdp/2.5.0.0-1245/kafka/libs/metrics-core-2.2.0.jar" \
src/main/python/StreamingKafkaDepartmentCount.py \
/user/dgadiraju/streamingkafkadepartmentcount/cnt
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
conf = SparkConf(). \
setAppName("Streaming Department Count"). \
setMaster("yarn-client")
sc = SparkContext(conf=conf)
# wskafka.conf: A single-node Flume configuration
# to read data from webserver logs and publish
# to kafka topic
# Name the components on this agent
wk.sources = ws
wk.sinks = kafka
wk.channels = mem
# Describe/configure the source
spark-submit --master yarn \
--conf spark.ui.port=12890 \
--jars "/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming-flume_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming-flume-sink_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/flume/lib/flume-ng-sdk-1.5.2.2.5.0.0-1245.jar" \
src/main/python/StreamingFlumeDepartmentCount.py \
gw01.itversity.com 8123 /user/dgadiraju/streamingflumedepartmentcount/cnt
daily_revenue_per_product_df.show(100)
daily_revenue_per_product_df.save("/user/dgadiraju/daily_revenue_save", "json")
daily_revenue_per_product_df.write.json("/user/dgadiraju/daily_revenue_write")
daily_revenue_per_product_df.select("order_date", "daily_revenue_per_product").show()
daily_revenue_per_product_df.filter(daily_revenue_per_product_df["order_date"] == "2013-07-26 00:00:00.0").show()
sqlContext.sql("CREATE DATABASE dgadiraju_daily_revenue");
sqlContext.sql("CREATE TABLE dgadiraju_daily_revenue.daily_revenue (order_date string, product_name string, daily_revenue_per_product float) STORED AS orc")
daily_revenue_per_product_df = sqlContext.sql("SELECT o.order_date, p.product_name, sum(oi.order_item_subtotal) daily_revenue_per_product \
FROM orders o JOIN order_items oi \
ON o.order_id = oi.order_item_order_id \
JOIN products p \
ON p.product_id = oi.order_item_product_id \
WHERE o.order_status IN ('COMPLETE', 'CLOSED') \
GROUP BY o.order_date, p.product_name \
sqlContext.sql("use dgadiraju_retail_db_txt")
from pyspark.sql import Row
productsRaw = open("/data/retail_db/products/part-00000").read().splitlines()
productsRDD = sc.parallelize(productsRaw)
productsDF = productsRDD.\
map(lambda p: Row(product_id=int(p.split(",")[0]), product_name=p.split(",")[2])).\
toDF()
productsDF.registerTempTable("products")
sqlContext.sql("select * from products").show()
from pyspark.sql import Row
ordersRDD = sc.textFile("/public/retail_db/orders")
ordersDF = ordersRDD.\
map(lambda o: Row(order_id=int(o.split(",")[0]), order_date=o.split(",")[1], order_customer_id=int(o.split(",")[2]), order_status=o.split(",")[3])).toDF()
ordersDF.registerTempTable("ordersDF_table")
sqlContext.sql("select order_status, count(1) from ordersDF_table group by order_status").show()
select * from (
select o.order_id, o.order_date, o.order_status, oi.order_item_subtotal,
round(sum(oi.order_item_subtotal) over (partition by o.order_id), 2) order_revenue,
oi.order_item_subtotal/round(sum(oi.order_item_subtotal) over (partition by o.order_id), 2) pct_revenue,
round(avg(oi.order_item_subtotal) over (partition by o.order_id), 2) avg_revenue
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
where o.order_status in ('COMPLETE', 'CLOSED')) q
where order_revenue >= 1000
order by order_date, order_revenue desc, rank_revenue;
select o.order_id, o.order_date, o.order_status, round(sum(oi.order_item_subtotal), 2) order_revenue
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
where o.order_status in ('COMPLETE', 'CLOSED')
group by o.order_id, o.order_date, o.order_status
having sum(oi.order_item_subtotal) >= 1000
order by o.order_date, order_revenue desc;
select o.order_id, o.order_date, o.order_status, round(sum(oi.order_item_subtotal), 2) order_revenue
from orders o join order_items oi