Skip to content

Instantly share code, notes, and snippets.

@TomLous
Last active June 14, 2022 14:10
Show Gist options
  • Save TomLous/8c4c2b37e7ed44c3ef6300a65b1022f6 to your computer and use it in GitHub Desktop.
Save TomLous/8c4c2b37e7ed44c3ef6300a65b1022f6 to your computer and use it in GitHub Desktop.
package functions.spark
import java.sql.Timestamp
import java.time.Instant
import java.util.UUID
case class CDFRecord[Out <: Product, In <: Convertable[Out]](value: In) {
def convert: Out = {
val ingestedAt = Timestamp.from(Instant.now())
val ingestId = UUID.randomUUID().toString
value.convert(ingestedAt, ingestId)
}
}
package functions.spark
import java.sql.Timestamp
abstract class Convertable[T <: Product] extends Product{
def convert(ingestedAt: Timestamp, ingestedId:String):T
}
package functions.spark
import java.sql.Timestamp
case class InItem(
RAW_NAME: String,
RAW_VALUE: Int,
OPTIONAL_RAW_DOUBLE: Option[Double],
SOME_TIME: Timestamp
) extends Convertable [OutItem] {
override def convert(ingestedAt: Timestamp, ingestedId: String): OutItem = OutItem(
RAW_NAME,
RAW_VALUE,
OPTIONAL_RAW_DOUBLE.map(BigDecimal.valueOf).getOrElse(0.0),
SOME_TIME.getTime,
ingestedAt.getTime,
ingestedId
)
}
package functions.spark
case class OutItem(
name: String,
int_value: Int,
decimal: BigDecimal,
timestamp: Long,
ingested_at: Long,
ingest_id: String
)
import functions.spark.{InItem, OutItem}
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.sql.functions._
import org.json4s._
import org.json4s.jackson.Serialization.{read, write}
import scala.reflect.runtime.universe.TypeTag
import java.sql.Timestamp
import java.time.Instant
val spark: SparkSession = SparkSession
.builder()
.master("local[3]")
.appName("test3")
.config("spark.ui.showConsoleProgress", true)
.getOrCreate()
import spark.implicits._
implicit val formats: Formats = DefaultFormats
val incomingStream = List(
InItem("A", 1, None, Timestamp.valueOf("2022-06-13 01:00:00")),
InItem("B", 2, Some(2.3333333333), Timestamp.valueOf("2022-06-13 02:00:00")),
InItem("C", 3, Some(33.0), Timestamp.valueOf("2022-06-13 03:00:00"))
).zipWithIndex
.map { case (in, idx) =>
(null, write(in), "incoming", 0, idx, Timestamp.from(Instant.now()))
}
.toDF("key", "value", "topic", "partition", "offset", "timestamp")
// create
incomingStream.printSchema
incomingStream.show(false)
/*
root
|-- key: void (nullable = true)
|-- value: string (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = false)
|-- offset: integer (nullable = false)
|-- timestamp: timestamp (nullable = true)
res0: Unit = ()
+----+----------------------------------------------------------------------------------------------------+--------+---------+------+-----------------------+
|key |value |topic |partition|offset|timestamp |
+----+----------------------------------------------------------------------------------------------------+--------+---------+------+-----------------------+
|null|{"RAW_NAME":"A","RAW_VALUE":1,"SOME_TIME":"2022-06-12T23:00:00Z"} |incoming|0 |0 |2022-06-13 15:41:54.079|
|null|{"RAW_NAME":"B","RAW_VALUE":2,"OPTIONAL_RAW_DOUBLE":2.3333333333,"SOME_TIME":"2022-06-13T00:00:00Z"}|incoming|0 |1 |2022-06-13 15:41:54.081|
|null|{"RAW_NAME":"C","RAW_VALUE":3,"OPTIONAL_RAW_DOUBLE":33.0,"SOME_TIME":"2022-06-13T01:00:00Z"} |incoming|0 |2 |2022-06-13 15:41:54.082|
+----+----------------------------------------------------------------------------------------------------+--------+---------+------+-----------------------+
*/
class Transformer[Out <: Product : TypeTag, In <: Convertable[Out]: TypeTag] {
def transform(incoming:DataFrame):DataFrame =
incoming
.select(
from_json('value, Encoders.product[In].schema).as("value")
)
.as[CDFRecord[Out, In]]
.map(_.convert)
.toDF
}
val transformedStream = new Transformer[OutItem, InItem].transform(incomingStream)
transformedStream.printSchema
transformedStream.show(false)
/*
root
|-- name: string (nullable = true)
|-- int_value: integer (nullable = false)
|-- decimal: decimal(38,18) (nullable = true)
|-- timestamp: long (nullable = false)
|-- ingested_at: long (nullable = false)
|-- ingest_id: string (nullable = true)
res2: Unit = ()
+----+---------+---------------------+-------------+-------------+------------------------------------+
|name|int_value|decimal |timestamp |ingested_at |ingest_id |
+----+---------+---------------------+-------------+-------------+------------------------------------+
|A |1 |0E-18 |1655074800000|1655213536030|1229a835-58e2-4e55-b94e-7443cff5cee4|
|B |2 |2.333333333300000000 |1655078400000|1655213536238|25faca42-b4f7-41c2-9936-30791c1c6139|
|C |3 |33.000000000000000000|1655082000000|1655213536238|c9abc8b7-2a65-445e-9574-328d557f700e|
+----+---------+---------------------+-------------+-------------+------------------------------------+
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment