Skip to content

Instantly share code, notes, and snippets.

@notthatbreezy
Created October 7, 2019 13:33
Show Gist options
  • Save notthatbreezy/96f3e1efa11ef268cb46b39296cec6e6 to your computer and use it in GitHub Desktop.
Save notthatbreezy/96f3e1efa11ef268cb46b39296cec6e6 to your computer and use it in GitHub Desktop.
package notthatbreezy.memcached4s.client
import java.net.InetSocketAddress
import cats.effect._
import cats.implicits._
import com.colisweb.tracing.LoggingTracingContext
import com.colisweb.tracing.implicits._
import fs2.io.tcp._
import fs2.{Chunk, text}
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger
import io.circe._
import io.circe.generic.semiauto._
import io.circe.parser._
import io.circe.syntax._
import scala.concurrent.duration._
import scala.language.postfixOps
case class Client[F[_]: Concurrent](address: InetSocketAddress)(
implicit cs: ContextShift[F]
) {
def set[T: Encoder](key: String, value: T): F[Unit] = {
Blocker[F].use { blocker =>
SocketGroup[F](blocker).use { socketGroup =>
socketGroup.client[F](address).use { client =>
val valueJson = value.asJson.noSpaces
val command =
s"set $key 0 0 ${valueJson.getBytes.length}\r\n$valueJson\r\n"
for {
logger <- Slf4jLogger.create[F]
_ <- logger.debug(s"Writing $key")
_ <- client.write(Chunk.bytes(command.getBytes))
} yield ()
}
}
}
}
def get[T: Decoder](key: String): F[Option[T]] = {
Blocker[F]
.use { blocker =>
SocketGroup[F](blocker).use { socketGroup =>
val clientResource = socketGroup.client[F](address)
clientResource.use { client =>
val getString: Array[Byte] = s"get $key\r\n".getBytes
for {
logger <- Slf4jLogger.create[F]
_ <- logger.debug("Before requesting response")
_ <- client.write(Chunk.bytes(getString), Some(10 seconds))
response <- client
.reads(256 * 1024, Some(10 seconds))
.through(text.utf8Decode)
.through(text.lines)
.takeThrough(_ != "END")
.compile
.toVector
_ <- logger.debug("After Response")
} yield {
response.lift(1) match {
case Some(v) => {
parse(v) match {
case Left(e) => {
throw e
}
case Right(s) => {
s.as[T] match {
case Left(e) => throw e
case Right(t) => Some(t)
}
}
}
}
case _ => None
}
}
}
}
}
}
def caching[T: Encoder: Decoder](key: String)(f: => F[T]): F[T] = {
get[T](key) flatMap {
case Some(t) => t.pure[F]
case _ =>
for {
value <- f
_ <- set[T](key, value)
} yield value
}
}
}
object App extends IOApp {
// Testing out encoding/decoding
case class Placeholder(data: String)
implicit val fooDecoder: Decoder[Placeholder] = deriveDecoder[Placeholder]
implicit val fooEncoder: Encoder[Placeholder] = deriveEncoder[Placeholder]
override def run(args: List[String]): IO[ExitCode] = {
val address: InetSocketAddress = new InetSocketAddress("172.19.0.2", 11211)
val client = Client[IO](address)
val tracer = LoggingTracingContext[IO]() _
for {
result <- tracer("cachingTracer", Map.empty) wrap {
client.caching("cool") {
IO.pure(Placeholder("cat"))
}
}
} yield {
println(result)
ExitCode.Success
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment