Skip to content

Instantly share code, notes, and snippets.

@bphenriques
Last active October 20, 2023 16:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bphenriques/10aa33b953e8bf1b2226c46a5341a56b to your computer and use it in GitHub Desktop.
Save bphenriques/10aa33b953e8bf1b2226c46a5341a56b to your computer and use it in GitHub Desktop.
CheckForwardJsonSchemaCompatibility
//> using scala 3.3.1
//> using repository https://packages.confluent.io/maven/
//> using dep io.confluent:kafka-json-schema-provider:7.5.1
//> using dep org.typelevel::cats-core:2.10.0
//> using dep co.fs2::fs2-core:3.9.2
//> using dep co.fs2::fs2-io:3.9.2
import io.confluent.kafka.schemaregistry.json.JsonSchema
import fs2.io.file.*
import fs2.text
import cats.syntax.all.*
import cats.effect.{ExitCode, IO, IOApp}
import io.confluent.kafka.schemaregistry.{CompatibilityLevel, SimpleParsedSchemaHolder}
import scala.jdk.CollectionConverters.*
import java.util.Collections
object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
def parsePath(str: Option[String], onError: => String): IO[Path] =
IO.fromOption(str)(new RuntimeException(onError)).map(Path(_))
for {
previous <- parsePath(args.headOption, "The 1st argument is the previous json-schema")
proposed <- parsePath(args.tail.headOption, "The 2nd argument is the proposed json-schema")
_ <- assertForwardCompatibility(previous, proposed)
} yield ExitCode.Success
}
def assertForwardCompatibility(previous: Path, proposed: Path): IO[Unit] = {
def parse(file: Path): IO[JsonSchema] =
Files[IO].readAll(file).through(text.utf8.decode).compile.string.flatMap { schemaString =>
IO(new JsonSchema(schemaString))
}
(
parse(previous),
parse(proposed)
).flatMapN { case (previousSchema, proposedSchema) =>
IO(
previousSchema
.isCompatible(
CompatibilityLevel.FORWARD,
Collections.singletonList(new SimpleParsedSchemaHolder(proposedSchema))
)
.asScala
).flatMap { errors =>
if (errors.isEmpty) {
IO.println("Compatible!")
} else {
IO.raiseError(new RuntimeException(errors.mkString("\n")))
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment