Skip to content

Instantly share code, notes, and snippets.

@vil1 vil1/schema.scala Secret
Last active Mar 13, 2019

Embed
What would you like to do?
import matryoshka._
import matryoshka.implicits._
import scalaz.Functor
sealed trait Type
case object IntType extends Type
case object StringType extends Type
final case class Struct(name: String, fields: Vector[(String, Type)]) extends Type
final case class Array(elementType: Type) extends Type
sealed trait TypeF[+A]
sealed trait TerminalType extends TypeF[Nothing]
case object IntF extends TerminalType
case object StringF extends TerminalType
final case class StructF[A](name: String, fields: Vector[(String, A)]) extends TypeF[A]
final case class ArrayF[A](elementType: A) extends TypeF[A]
object TypeF {
import scalaz._, Scalaz._
import scala.language.higherKinds
implicit val typeFunctor = new Functor[TypeF] {
override def map[A, B](fa: TypeF[A])(f: (A) => B): TypeF[B] = fa match {
case t: TerminalType => t
case StructF(name, fields) => StructF(name, fields map { case (k, v) => k -> f(v) })
case ArrayF(elementType) => ArrayF(f(elementType))
}
}
implicit val typeTraverse = new Traverse[TypeF] {
override def traverseImpl[G[_], A, B](fa: TypeF[A])(f: (A) => G[B])(implicit G: Applicative[G]): G[TypeF[B]] =
fa match {
case t: TerminalType => G.point(t)
case StructF(name, fields) =>
val (keys, values) = fields.unzip
values.traverse(f).map(vs => StructF(name, keys zip vs))
case ArrayF(elementType) => f(elementType).map(ArrayF.apply)
}
}
implicit val typeRecursive: Recursive.Aux[Type, TypeF] = new Recursive[Type] {
type Base[A] = TypeF[A]
override def project(t: Type)(implicit BF: Functor[Base]): BaseT[Type] = t match {
case IntType => IntF
case StringType => StringF
case Struct(name, fields) => StructF(name, fields)
case Array(elementType) => ArrayF(elementType)
}
}
def translateSchema[M[_]: Monad, T](schema: Type)(implicit recursive: Recursive.Aux[Type, TypeF],
algebra: AlgebraM[M, TypeF, T]): M[T] =
schema.cataM(algebra)
}
object Spark {
import TypeF._
import org.apache.spark.sql.types.{StringType => StringField, _}
import scalaz.Id.Id
implicit val toSparkAlgebra: Algebra[TypeF, DataType] = {
case ArrayF(elementType) => ArrayType(elementType)
case StructF(_, fields) => StructType(fields.map { case (k, v) => StructField(k, v) })
case IntF => IntegerType
case StringF => StringField
}
def convertSchema(fieldType: Type) = translateSchema[Id, DataType](fieldType)
}
object Avro {
import TypeF._
import org.apache.avro._
import scalaz.Id.Id
def toAvroAlgebra(namespace: String): Algebra[TypeF, Schema] = {
case StructF(name, fields) =>
fields
.foldLeft(SchemaBuilder.builder(namespace).record(name).fields()) {
case (builder, (n, schema)) =>
builder.name(n).`type`(schema).noDefault()
}
.endRecord()
case ArrayF(elementType) => SchemaBuilder.array().items(elementType)
case IntF => Schema.create(Schema.Type.INT)
case StringF => Schema.create(Schema.Type.STRING)
}
def convertSchema(fieldType: Type, namespace: String) = {
implicit val algebra = toAvroAlgebra(namespace)
translateSchema[Id, Schema](fieldType)
}
}
sealed trait Data
final case class GInt(value: Int) extends Data
final case class GString(value: String) extends Data
final case class GStruct(fields: Vector[(String, Data)]) extends Data
final case class GArray(items: Vector[Data]) extends Data
sealed trait DataF[+A]
sealed trait TerminalData extends DataF[Nothing]
final case class GIntF(value: Int) extends TerminalData
final case class GStringF(value: String) extends TerminalData
final case class GStructF[A](fields: List[(String, A)]) extends DataF[A]
final case class GArrayF[A](items: Vector[A]) extends DataF[A]
object DataF {
implicit val dataFunctor = new Functor[DataF] {
override def map[A, B](fa: DataF[A])(f: (A) => B): DataF[B] = fa match {
case t: TerminalData => t
case GStructF(fields) => GStructF(fields.map { case (k, v) => k -> f(v) })
case GArrayF(items) => GArrayF(items map f)
}
}
}
object Validation {
import scalaz._, Scalaz._
import jto.validation._, jsonast._, Rules._
import TypeF._
type JRule[A] = Rule[JValue, A]
implicit val jRuleApplicative = new Applicative[JRule] {
override def point[A](a: => A): JRule[A] = Rule.pure(a)
override def ap[A, B](fa: => JRule[A])(f: => JRule[(A) => B]): JRule[B] = fa ap f
}
implicit val schemaToJRule: Algebra[TypeF, JRule[Data]] = {
case StructF(_, fields) =>
fields
.traverse[JRule, (String, Data)] {
case (k, r) =>
(Path \ k).read[JValue, JValue, Data](r).map(k -> _)
}
.map(GStruct)
case ArrayF(rule) =>
pickSeq(rule).map(d => GArray(d.toVector))
case IntF => Rule.of[JValue, Int].map(GInt)
case StringF => Rule.of[JValue, String].map(GString)
}
def convertSchema(schema: Type) = translateSchema[Id, JRule[Data]](schema)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.