Skip to content

Instantly share code, notes, and snippets.

@dgadiraju
Created March 21, 2019 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgadiraju/c535fea61d0133eee4ad544961fee0c1 to your computer and use it in GitHub Desktop.
Save dgadiraju/c535fea61d0133eee4ad544961fee0c1 to your computer and use it in GitHub Desktop.
package spark2demo;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.sum;
import static org.apache.spark.sql.functions.round;
public class GettingStarted {
public static void main(String[] args) { // your system
SparkSession spark = SparkSession.
builder().
appName("Simple Application").
master("local").
getOrCreate();
String ordersPath = "/Users/itversity/Research/data/retail_db/orders";
Dataset<Row> orders = spark.
read().
schema("order_id INT, order_date STRING, order_customer_id INT, order_status STRING").
format("csv").
load(ordersPath).
cache();
Dataset<Row> ordersFiltered = orders.
filter("order_status IN ('COMPLETE', 'CLOSED')");
String orderItemsPath = "/Users/itversity/Research/data/retail_db/order_items";
Dataset<Row> orderItems = spark.
read().
schema("order_item_id INT, order_item_order_id INT, "
+ "order_item_product_id INT, order_item_quantity INT, "
+ "order_item_subtotal FLOAT, order_item_product_price FLOAT").
format("csv").
load(orderItemsPath);
Dataset<Row> ordersJoin = ordersFiltered.
join(orderItems, ordersFiltered.col("order_id").equalTo(orderItems.col("order_item_order_id")));
ordersJoin.
groupBy("order_date").
agg(round(sum(ordersJoin.col("order_item_subtotal")), 2).alias("revenue")).
show();
spark.stop();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment