Created
October 7, 2019 13:33
-
-
Save notthatbreezy/96f3e1efa11ef268cb46b39296cec6e6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package notthatbreezy.memcached4s.client | |
import java.net.InetSocketAddress | |
import cats.effect._ | |
import cats.implicits._ | |
import com.colisweb.tracing.LoggingTracingContext | |
import com.colisweb.tracing.implicits._ | |
import fs2.io.tcp._ | |
import fs2.{Chunk, text} | |
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger | |
import io.circe._ | |
import io.circe.generic.semiauto._ | |
import io.circe.parser._ | |
import io.circe.syntax._ | |
import scala.concurrent.duration._ | |
import scala.language.postfixOps | |
case class Client[F[_]: Concurrent](address: InetSocketAddress)( | |
implicit cs: ContextShift[F] | |
) { | |
def set[T: Encoder](key: String, value: T): F[Unit] = { | |
Blocker[F].use { blocker => | |
SocketGroup[F](blocker).use { socketGroup => | |
socketGroup.client[F](address).use { client => | |
val valueJson = value.asJson.noSpaces | |
val command = | |
s"set $key 0 0 ${valueJson.getBytes.length}\r\n$valueJson\r\n" | |
for { | |
logger <- Slf4jLogger.create[F] | |
_ <- logger.debug(s"Writing $key") | |
_ <- client.write(Chunk.bytes(command.getBytes)) | |
} yield () | |
} | |
} | |
} | |
} | |
def get[T: Decoder](key: String): F[Option[T]] = { | |
Blocker[F] | |
.use { blocker => | |
SocketGroup[F](blocker).use { socketGroup => | |
val clientResource = socketGroup.client[F](address) | |
clientResource.use { client => | |
val getString: Array[Byte] = s"get $key\r\n".getBytes | |
for { | |
logger <- Slf4jLogger.create[F] | |
_ <- logger.debug("Before requesting response") | |
_ <- client.write(Chunk.bytes(getString), Some(10 seconds)) | |
response <- client | |
.reads(256 * 1024, Some(10 seconds)) | |
.through(text.utf8Decode) | |
.through(text.lines) | |
.takeThrough(_ != "END") | |
.compile | |
.toVector | |
_ <- logger.debug("After Response") | |
} yield { | |
response.lift(1) match { | |
case Some(v) => { | |
parse(v) match { | |
case Left(e) => { | |
throw e | |
} | |
case Right(s) => { | |
s.as[T] match { | |
case Left(e) => throw e | |
case Right(t) => Some(t) | |
} | |
} | |
} | |
} | |
case _ => None | |
} | |
} | |
} | |
} | |
} | |
} | |
def caching[T: Encoder: Decoder](key: String)(f: => F[T]): F[T] = { | |
get[T](key) flatMap { | |
case Some(t) => t.pure[F] | |
case _ => | |
for { | |
value <- f | |
_ <- set[T](key, value) | |
} yield value | |
} | |
} | |
} | |
object App extends IOApp { | |
// Testing out encoding/decoding | |
case class Placeholder(data: String) | |
implicit val fooDecoder: Decoder[Placeholder] = deriveDecoder[Placeholder] | |
implicit val fooEncoder: Encoder[Placeholder] = deriveEncoder[Placeholder] | |
override def run(args: List[String]): IO[ExitCode] = { | |
val address: InetSocketAddress = new InetSocketAddress("172.19.0.2", 11211) | |
val client = Client[IO](address) | |
val tracer = LoggingTracingContext[IO]() _ | |
for { | |
result <- tracer("cachingTracer", Map.empty) wrap { | |
client.caching("cool") { | |
IO.pure(Placeholder("cat")) | |
} | |
} | |
} yield { | |
println(result) | |
ExitCode.Success | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment