Skip to content

Instantly share code, notes, and snippets.

@DCameronMauch
Created October 8, 2020 21:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DCameronMauch/25cd147e7cfd469e0d724931a551f041 to your computer and use it in GitHub Desktop.
Save DCameronMauch/25cd147e7cfd469e0d724931a551f041 to your computer and use it in GitHub Desktop.
Attempt to serialize ADT
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.unsafe.types.UTF8String
import scala.reflect.ClassTag
import spark.implicits._
sealed trait DayOfWeek {self =>
val value: String = self.getClass.getName
}
object DayOfWeek {
final object Sunday extends DayOfWeek
final object Monday extends DayOfWeek
final object Tuesday extends DayOfWeek
final object Wednesday extends DayOfWeek
final object Thursday extends DayOfWeek
final object Friday extends DayOfWeek
final object Saturday extends DayOfWeek
val members: List[DayOfWeek] = List(Sunday, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday)
def apply(value: String): DayOfWeek = members
.map(member => (member.value, member))
.toMap
.apply(value)
private val clazz: Class[DayOfWeek] = classOf[DayOfWeek]
private val inputObject: BoundReference = BoundReference(0, ObjectType(clazz), false)
private val converter = StaticInvoke(
classOf[UTF8String],
StringType,
"fromString",
Invoke(inputObject, "value", ObjectType(classOf[String])) :: Nil
)
private val serializer: Seq[Expression] = CreateNamedStruct(Literal("value") :: converter :: Nil).flatten
private val deserializer: Expression = null
implicit def encoder: Encoder[DayOfWeek] = new ExpressionEncoder[DayOfWeek](
schema = StructType(Seq(StructField("value", StringType, false))),
flat = true,
serializer = serializer,
deserializer = deserializer,
clsTag = ClassTag(classOf[DayOfWeek])
)
}
case class Payload(id: Int, name: String, dayOfWeek: DayOfWeek)
import DayOfWeek._
val ds1: Dataset[Payload] = Seq(
Payload(1, "abc", Monday),
Payload(2, "def", Tuesday),
Payload(3, "ghi", Wednesday)
).toDS
z.show(ds1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment