Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
trait SchemaRegistryAlgebra[F[_]] {
def getSchemaStringForID(id:Int):F[String]
}
object SchemaRegistryAlgebra{
private[this] def appendToUrl[E, M[_]: MonadError[?[_], E]](ef:String => E)(url:String Refined Url, suffix:String):M[String Refined Url] =
refineV[Url]( if (url.value.endsWith("/")) url.value+suffix else url.value + "/" + suffix).fold[M[String Refined Url]](es => MonadError[M, E].raiseError(ef(es)), MonadError[M, E].pure)
def ioSchemaRegistry(schemaRegistryURL: String Refined Url):SchemaRegistryAlgebra[IO] = new SchemaRegistryAlgebra[IO] {
val httpClientIO = Http1Client[IO]()
val cache = Ref.of[IO, Map[Int,String]](Map.empty[Int,String])
private[this] def appendToUrlIO(url:String Refined Url, suffix:String):IO[String Refined Url] = appendToUrl[Throwable, IO](es => new RuntimeException(es))(url, suffix)
override def getSchemaStringForID(id: Int): IO[String] = for {
ref <- cache
map <- ref.get
res <- map.get(id).fold(
for {
client <- httpClientIO
url <- appendToUrlIO(schemaRegistryURL, s"schemas/ids/$id")
resultString <- client.expect[String](url.value)
out <- ref.modify(currentCache => (currentCache + (id -> resultString), resultString) )
} yield out
)(IO.pure)
} yield res
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment