-
-
Save vil1/493c6c5cc8d236d7dcf42a350fe5ab7a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import matryoshka._ | |
import matryoshka.implicits._ | |
import scalaz.Functor | |
sealed trait Type | |
case object IntType extends Type | |
case object StringType extends Type | |
final case class Struct(name: String, fields: Vector[(String, Type)]) extends Type | |
final case class Array(elementType: Type) extends Type | |
sealed trait TypeF[+A] | |
sealed trait TerminalType extends TypeF[Nothing] | |
case object IntF extends TerminalType | |
case object StringF extends TerminalType | |
final case class StructF[A](name: String, fields: Vector[(String, A)]) extends TypeF[A] | |
final case class ArrayF[A](elementType: A) extends TypeF[A] | |
object TypeF { | |
import scalaz._, Scalaz._ | |
import scala.language.higherKinds | |
implicit val typeFunctor = new Functor[TypeF] { | |
override def map[A, B](fa: TypeF[A])(f: (A) => B): TypeF[B] = fa match { | |
case t: TerminalType => t | |
case StructF(name, fields) => StructF(name, fields map { case (k, v) => k -> f(v) }) | |
case ArrayF(elementType) => ArrayF(f(elementType)) | |
} | |
} | |
implicit val typeTraverse = new Traverse[TypeF] { | |
override def traverseImpl[G[_], A, B](fa: TypeF[A])(f: (A) => G[B])(implicit G: Applicative[G]): G[TypeF[B]] = | |
fa match { | |
case t: TerminalType => G.point(t) | |
case StructF(name, fields) => | |
val (keys, values) = fields.unzip | |
values.traverse(f).map(vs => StructF(name, keys zip vs)) | |
case ArrayF(elementType) => f(elementType).map(ArrayF.apply) | |
} | |
} | |
implicit val typeRecursive: Recursive.Aux[Type, TypeF] = new Recursive[Type] { | |
type Base[A] = TypeF[A] | |
override def project(t: Type)(implicit BF: Functor[Base]): BaseT[Type] = t match { | |
case IntType => IntF | |
case StringType => StringF | |
case Struct(name, fields) => StructF(name, fields) | |
case Array(elementType) => ArrayF(elementType) | |
} | |
} | |
def translateSchema[M[_]: Monad, T](schema: Type)(implicit recursive: Recursive.Aux[Type, TypeF], | |
algebra: AlgebraM[M, TypeF, T]): M[T] = | |
schema.cataM(algebra) | |
} | |
object Spark { | |
import TypeF._ | |
import org.apache.spark.sql.types.{StringType => StringField, _} | |
import scalaz.Id.Id | |
implicit val toSparkAlgebra: Algebra[TypeF, DataType] = { | |
case ArrayF(elementType) => ArrayType(elementType) | |
case StructF(_, fields) => StructType(fields.map { case (k, v) => StructField(k, v) }) | |
case IntF => IntegerType | |
case StringF => StringField | |
} | |
def convertSchema(fieldType: Type) = translateSchema[Id, DataType](fieldType) | |
} | |
object Avro { | |
import TypeF._ | |
import org.apache.avro._ | |
import scalaz.Id.Id | |
def toAvroAlgebra(namespace: String): Algebra[TypeF, Schema] = { | |
case StructF(name, fields) => | |
fields | |
.foldLeft(SchemaBuilder.builder(namespace).record(name).fields()) { | |
case (builder, (n, schema)) => | |
builder.name(n).`type`(schema).noDefault() | |
} | |
.endRecord() | |
case ArrayF(elementType) => SchemaBuilder.array().items(elementType) | |
case IntF => Schema.create(Schema.Type.INT) | |
case StringF => Schema.create(Schema.Type.STRING) | |
} | |
def convertSchema(fieldType: Type, namespace: String) = { | |
implicit val algebra = toAvroAlgebra(namespace) | |
translateSchema[Id, Schema](fieldType) | |
} | |
} | |
sealed trait Data | |
final case class GInt(value: Int) extends Data | |
final case class GString(value: String) extends Data | |
final case class GStruct(fields: Vector[(String, Data)]) extends Data | |
final case class GArray(items: Vector[Data]) extends Data | |
sealed trait DataF[+A] | |
sealed trait TerminalData extends DataF[Nothing] | |
final case class GIntF(value: Int) extends TerminalData | |
final case class GStringF(value: String) extends TerminalData | |
final case class GStructF[A](fields: List[(String, A)]) extends DataF[A] | |
final case class GArrayF[A](items: Vector[A]) extends DataF[A] | |
object DataF { | |
implicit val dataFunctor = new Functor[DataF] { | |
override def map[A, B](fa: DataF[A])(f: (A) => B): DataF[B] = fa match { | |
case t: TerminalData => t | |
case GStructF(fields) => GStructF(fields.map { case (k, v) => k -> f(v) }) | |
case GArrayF(items) => GArrayF(items map f) | |
} | |
} | |
} | |
object Validation { | |
import scalaz._, Scalaz._ | |
import jto.validation._, jsonast._, Rules._ | |
import TypeF._ | |
type JRule[A] = Rule[JValue, A] | |
implicit val jRuleApplicative = new Applicative[JRule] { | |
override def point[A](a: => A): JRule[A] = Rule.pure(a) | |
override def ap[A, B](fa: => JRule[A])(f: => JRule[(A) => B]): JRule[B] = fa ap f | |
} | |
implicit val schemaToJRule: Algebra[TypeF, JRule[Data]] = { | |
case StructF(_, fields) => | |
fields | |
.traverse[JRule, (String, Data)] { | |
case (k, r) => | |
(Path \ k).read[JValue, JValue, Data](r).map(k -> _) | |
} | |
.map(GStruct) | |
case ArrayF(rule) => | |
pickSeq(rule).map(d => GArray(d.toVector)) | |
case IntF => Rule.of[JValue, Int].map(GInt) | |
case StringF => Rule.of[JValue, String].map(GString) | |
} | |
def convertSchema(schema: Type) = translateSchema[Id, JRule[Data]](schema) | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment