Create a gist now

Instantly share code, notes, and snippets.

@vil1 /schema.scala Secret
Last active Nov 30, 2017

What would you like to do?
import matryoshka._
import matryoshka.implicits._
import scalaz.Functor
sealed trait Type
case object IntType extends Type
case object StringType extends Type
final case class Struct(name: String, fields: Vector[(String, Type)]) extends Type
final case class Array(elementType: Type) extends Type
sealed trait TypeF[+A]
sealed trait TerminalType extends TypeF[Nothing]
case object IntF extends TerminalType
case object StringF extends TerminalType
final case class StructF[A](name: String, fields: Vector[(String, A)]) extends TypeF[A]
final case class ArrayF[A](elementType: A) extends TypeF[A]
object TypeF {
import scalaz._, Scalaz._
import scala.language.higherKinds
implicit val typeFunctor = new Functor[TypeF] {
override def map[A, B](fa: TypeF[A])(f: (A) => B): TypeF[B] = fa match {
case t: TerminalType => t
case StructF(name, fields) => StructF(name, fields map { case (k, v) => k -> f(v) })
case ArrayF(elementType) => ArrayF(f(elementType))
}
}
implicit val typeTraverse = new Traverse[TypeF] {
override def traverseImpl[G[_], A, B](fa: TypeF[A])(f: (A) => G[B])(implicit G: Applicative[G]): G[TypeF[B]] =
fa match {
case t: TerminalType => G.point(t)
case StructF(name, fields) =>
val (keys, values) = fields.unzip
values.traverse(f).map(vs => StructF(name, keys zip vs))
case ArrayF(elementType) => f(elementType).map(ArrayF.apply)
}
}
implicit val typeRecursive: Recursive.Aux[Type, TypeF] = new Recursive[Type] {
type Base[A] = TypeF[A]
override def project(t: Type)(implicit BF: Functor[Base]): BaseT[Type] = t match {
case IntType => IntF
case StringType => StringF
case Struct(name, fields) => StructF(name, fields)
case Array(elementType) => ArrayF(elementType)
}
}
def translateSchema[M[_]: Monad, T](schema: Type)(implicit recursive: Recursive.Aux[Type, TypeF],
algebra: AlgebraM[M, TypeF, T]): M[T] =
schema.cataM(algebra)
}
object Spark {
import TypeF._
import org.apache.spark.sql.types.{StringType => StringField, _}
import scalaz.Id.Id
implicit val toSparkAlgebra: Algebra[TypeF, DataType] = {
case ArrayF(elementType) => ArrayType(elementType)
case StructF(_, fields) => StructType(fields.map { case (k, v) => StructField(k, v) })
case IntF => IntegerType
case StringF => StringField
}
def convertSchema(fieldType: Type) = translateSchema[Id, DataType](fieldType)
}
object Avro {
import TypeF._
import org.apache.avro._
import scalaz.Id.Id
def toAvroAlgebra(namespace: String): Algebra[TypeF, Schema] = {
case StructF(name, fields) =>
fields
.foldLeft(SchemaBuilder.builder(namespace).record(name).fields()) {
case (builder, (n, schema)) =>
builder.name(n).`type`(schema).noDefault()
}
.endRecord()
case ArrayF(elementType) => SchemaBuilder.array().items(elementType)
case IntF => Schema.create(Schema.Type.INT)
case StringF => Schema.create(Schema.Type.STRING)
}
def convertSchema(fieldType: Type, namespace: String) = {
implicit val algebra = toAvroAlgebra(namespace)
translateSchema[Id, Schema](fieldType)
}
}
sealed trait Data
final case class GInt(value: Int) extends Data
final case class GString(value: String) extends Data
final case class GStruct(fields: Vector[(String, Data)]) extends Data
final case class GArray(items: Vector[Data]) extends Data
sealed trait DataF[+A]
sealed trait TerminalData extends DataF[Nothing]
final case class GIntF(value: Int) extends TerminalData
final case class GStringF(value: String) extends TerminalData
final case class GStructF[A](fields: List[(String, A)]) extends DataF[A]
final case class GArrayF[A](items: Vector[A]) extends DataF[A]
object DataF {
implicit val dataFunctor = new Functor[DataF] {
override def map[A, B](fa: DataF[A])(f: (A) => B): DataF[B] = fa match {
case t: TerminalData => t
case GStructF(fields) => GStructF(fields.map { case (k, v) => k -> f(v) })
case GArrayF(items) => GArrayF(items map f)
}
}
}
object Validation {
import scalaz._, Scalaz._
import jto.validation._, jsonast._, Rules._
import TypeF._
type JRule[A] = Rule[JValue, A]
implicit val jRuleApplicative = new Applicative[JRule] {
override def point[A](a: => A): JRule[A] = Rule.pure(a)
override def ap[A, B](fa: => JRule[A])(f: => JRule[(A) => B]): JRule[B] = fa ap f
}
implicit val schemaToJRule: Algebra[TypeF, JRule[Data]] = {
case StructF(_, fields) =>
fields
.traverse[JRule, (String, Data)] {
case (k, r) =>
(Path \ k).read[JValue, JValue, Data](r).map(k -> _)
}
.map(GStruct)
case ArrayF(rule) =>
pickSeq(rule).map(d => GArray(d.toVector))
case IntF => Rule.of[JValue, Int].map(GInt)
case StringF => Rule.of[JValue, String].map(GString)
}
def convertSchema(schema: Type) = translateSchema[Id, JRule[Data]](schema)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment