Skip to content

Instantly share code, notes, and snippets.

@Agustin-Mora-Damavis
Created July 8, 2024 08:43
Show Gist options
  • Save Agustin-Mora-Damavis/41bf116ba289d9f95e4ce75ed83e8745 to your computer and use it in GitHub Desktop.
Save Agustin-Mora-Damavis/41bf116ba289d9f95e4ce75ed83e8745 to your computer and use it in GitHub Desktop.
Apache Spark: Structured Streaming - Ejemplo Práctico 2
0 2024-05-13 00:00:00 2024-05-15 00:00:00 100.0
0 2024-05-13 00:00:00 2024-05-16 00:00:00 100.0
1 2024-05-13 00:00:00 2024-05-24 00:00:00 220.5
1 2024-05-13 00:00:00 2024-05-25 00:00:00 220.5
1 2024-05-13 00:00:00 2024-05-26 00:00:00 220.5
1 2024-05-13 00:00:00 2024-05-27 00:00:00 220.5
2 2024-05-13 00:00:00 2024-07-03 00:00:00 66.25
1 2024-05-13 00:00:00 2024-05-23 00:00:00 240.5
1 2024-05-13 00:00:00 2024-05-28 00:00:00 240.5
3 2024-05-14 00:00:00 2024-07-03 00:00:00 110.0
3 2024-05-14 00:00:00 2024-07-03 00:00:00 110.0
3 2024-05-14 00:00:00 2024-07-03 00:00:00 110.0
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StructType, TimestampType, DoubleType}
import java.sql.Timestamp
object StreamingTest {
case class Booking(id: Int, bookingDate: Timestamp, stayDate: Timestamp, price: Double)
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkSession using every core of the local machine
val spark = SparkSession
.builder
.appName("StreamingTest")
.master("local[*]")
.getOrCreate()
// Create schema in order to read bookings from files
val bookingSchema = new StructType()
.add("id", IntegerType, nullable = false)
.add("bookingDate", TimestampType, nullable = false)
.add("stayDate", TimestampType, nullable = false)
.add("price", DoubleType, nullable = false)
// Define the data source to read streaming data
import spark.implicits._
val bookings = spark
.readStream
.schema(bookingSchema)
.option("maxFilesPerTrigger", 1)
.option("sep", ",")
.option("header", "false")
.csv("data/bookings/")
.as[Booking]
// Group by booking identifier and obtain the total price and the first stay date
val groupedBookings = bookings
.groupBy("id")
.agg(sum("price").alias("totalPrice"), min("stayDate").alias("startDate"))
// Define the sink where the data will be output to
groupedBookings
.repartition(1)
.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment