Skip to content

Instantly share code, notes, and snippets.

@IgorBerman
Created December 18, 2019 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IgorBerman/8175eddd27860f05e258584c0d64b963 to your computer and use it in GitHub Desktop.
Save IgorBerman/8175eddd27860f05e258584c0d64b963 to your computer and use it in GitHub Desktop.
structured streaming example
package igorprivate;
import shaded.parquet.org.slf4j.LoggerFactory;
import static org.apache.spark.sql.functions.concat_ws;
import static org.apache.spark.sql.functions.date_format;
import static org.apache.spark.sql.functions.from_json;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.struct;
import static org.apache.spark.sql.functions.sum;
import static org.apache.spark.sql.functions.to_json;
import static org.apache.spark.sql.functions.when;
import static org.apache.spark.sql.functions.window;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Level;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.StructType;
public class SparkStructuredStreaminglMainExample {
public static void main(String[] args) throws Exception {
org.apache.log4j.Logger.getRootLogger().setLevel(Level.INFO);
SparkConf properties = new SparkConf();
properties.set("spark.sql.session.timeZone", "UTC");
properties.set("spark.default.parallelism", String.valueOf(8));
properties.set("spark.sql.shuffle.partitions", String.valueOf(8));
String master = System.getProperty("spark.master", "local["+Runtime.getRuntime().availableProcessors()+"]");
try (SparkSession spark = SparkSession.builder().config(properties).master(master).appName("my-awesome-spark").getOrCreate()) {
spark.sparkContext().setLogLevel("INFO");
Dataset<Row> df = spark.readStream().format("kafka")
.option("subscribe", "raw_sla_reports")
.option("kafka.bootstrap.servers", "0.0.0.0:29092")
.option("groupidprefix", "my-awesome-group-spark-streaming-dev")
.load();
Dataset<Row> rowDataset = df.selectExpr("CAST(value AS STRING)");
StructType schema = StructType.fromDDL("msgId string, sourceLabels map<string,string>, data array<struct<logicalTime: BIGINT, processingTime: BIGINT, labels: map<string,string>, value: INT>>");
Dataset<Row> jsonDF = rowDataset.select(from_json(rowDataset.col("value"), schema, new HashMap<>()).as("r"));
jsonDF = jsonDF.select("r.*");
Dataset<Row> raw_sla_reports_exploded_stream = jsonDF.selectExpr("msgId", "sourceLabels", "EXPLODE(data) as data");
Dataset<Row> d = raw_sla_reports_exploded_stream.selectExpr(
"msgId",
"sourceLabels['sourceGroup'] as sourceGroup",
"cast(data.logicalTime/1000 as timestamp) as logicalTime",
"cast(data.processingTime/1000 as timestamp) as processingTime",
"data.processingTime - data.logicalTime as delay",
"data.value as value",
"data.labels['msgtype'] as msgtype")
.withWatermark("logicalTime", "24 hour");
d = d.dropDuplicates("msgId", "logicalTime"); //important to have logicalTime inside
Dataset<Row> histo = d.groupBy(
window(d.col("logicalTime"), "60 seconds"),
d.col("sourceGroup"),
d.col("msgtype")
).agg(
sum(when(d.col("delay").leq(lit("60000")),
d.col("value")).otherwise(lit(0))).alias("zero_to_one_min"),
sum(when(d.col("delay").gt(lit("60000"))
.and(d.col("delay").leq(lit("300000"))),
d.col("value")).otherwise(lit(0))).alias("one_to_five_mins"),
sum(when(d.col("delay").gt(lit("300000"))
.and(d.col("delay").leq(lit("600000"))),
d.col("value")).otherwise(lit(0))).alias("five_to_ten_mins"),
sum(when(d.col("delay").gt(lit("600000"))
.and(d.col("delay").leq(lit("1800000"))),
d.col("value")).otherwise(lit(0))).alias("ten_to_thirty_mins"),
sum(when(d.col("delay").gt(lit("1800000")),
d.col("value")).otherwise(lit(0))).alias("more_than_thirty_mins"),
sum("value").alias("total_protos")
);
Dataset<Row> histo_projected = histo.selectExpr(
"sourceGroup",
"msgtype",
"window",
"zero_to_one_min",
"one_to_five_mins",
"five_to_ten_mins",
"ten_to_thirty_mins",
"more_than_thirty_mins",
"total_protos"
).withColumn("window_start", date_format(histo.col("window.start"), "yyyy-MM-dd-HH-mm-ss")).drop("window");
//histo_projected.printSchema();
Dataset<Row> histo_projected_with_agg_key = histo_projected.withColumn("aggKey",
concat_ws("_", histo_projected.col("sourceGroup"), histo_projected.col("msgtype"), histo_projected.col("window_start")));
//histo_projected_with_agg_key.printSchema();
Dataset<Row> forOutputTopic = histo_projected_with_agg_key.select(
to_json(struct(histo_projected_with_agg_key.col("*"))).alias("value")
); //value must be for kafka output
//forOutputTopic.printSchema();
StreamingQuery streamingQuery =
forOutputTopic.writeStream().format("kafka")
.option("kafka.bootstrap.servers", "0.0.0.0:29092")
.option("topic", "delays_per_group_histo_table")
.option("checkpointLocation", "/tmp/checkpoint")
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.start();
streamingQuery.awaitTermination();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment