Created
July 8, 2024 08:43
-
-
Save Agustin-Mora-Damavis/41bf116ba289d9f95e4ce75ed83e8745 to your computer and use it in GitHub Desktop.
Apache Spark: Structured Streaming - Ejemplo Práctico 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
0 | 2024-05-13 00:00:00 | 2024-05-15 00:00:00 | 100.0 | |
---|---|---|---|---|
0 | 2024-05-13 00:00:00 | 2024-05-16 00:00:00 | 100.0 | |
1 | 2024-05-13 00:00:00 | 2024-05-24 00:00:00 | 220.5 | |
1 | 2024-05-13 00:00:00 | 2024-05-25 00:00:00 | 220.5 | |
1 | 2024-05-13 00:00:00 | 2024-05-26 00:00:00 | 220.5 | |
1 | 2024-05-13 00:00:00 | 2024-05-27 00:00:00 | 220.5 | |
2 | 2024-05-13 00:00:00 | 2024-07-03 00:00:00 | 66.25 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 | 2024-05-13 00:00:00 | 2024-05-23 00:00:00 | 240.5 | |
---|---|---|---|---|
1 | 2024-05-13 00:00:00 | 2024-05-28 00:00:00 | 240.5 | |
3 | 2024-05-14 00:00:00 | 2024-07-03 00:00:00 | 110.0 | |
3 | 2024-05-14 00:00:00 | 2024-07-03 00:00:00 | 110.0 | |
3 | 2024-05-14 00:00:00 | 2024-07-03 00:00:00 | 110.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.log4j._ | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.types.{IntegerType, StructType, TimestampType, DoubleType} | |
import java.sql.Timestamp | |
object StreamingTest { | |
case class Booking(id: Int, bookingDate: Timestamp, stayDate: Timestamp, price: Double) | |
def main(args: Array[String]) { | |
// Set the log level to only print errors | |
Logger.getLogger("org").setLevel(Level.ERROR) | |
// Create a SparkSession using every core of the local machine | |
val spark = SparkSession | |
.builder | |
.appName("StreamingTest") | |
.master("local[*]") | |
.getOrCreate() | |
// Create schema in order to read bookings from files | |
val bookingSchema = new StructType() | |
.add("id", IntegerType, nullable = false) | |
.add("bookingDate", TimestampType, nullable = false) | |
.add("stayDate", TimestampType, nullable = false) | |
.add("price", DoubleType, nullable = false) | |
// Define the data source to read streaming data | |
import spark.implicits._ | |
val bookings = spark | |
.readStream | |
.schema(bookingSchema) | |
.option("maxFilesPerTrigger", 1) | |
.option("sep", ",") | |
.option("header", "false") | |
.csv("data/bookings/") | |
.as[Booking] | |
// Group by booking identifier and obtain the total price and the first stay date | |
val groupedBookings = bookings | |
.groupBy("id") | |
.agg(sum("price").alias("totalPrice"), min("stayDate").alias("startDate")) | |
// Define the sink where the data will be output to | |
groupedBookings | |
.repartition(1) | |
.writeStream | |
.format("console") | |
.outputMode("complete") | |
.start() | |
.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment