Skip to content

Instantly share code, notes, and snippets.

@machuz
Last active July 31, 2018 05:29
Show Gist options
  • Save machuz/2c0dabb4c23a69cba70e94570d887f4c to your computer and use it in GitHub Desktop.
Save machuz/2c0dabb4c23a69cba70e94570d887f4c to your computer and use it in GitHub Desktop.
redis実装パターン1
sealed abstract class RedisClient {
def put[A: ByteStringFormatter](
key: CacheKey,
value: A,
expireSeconds: Option[Long] = None
): Task[\/[EsError, A]]
def putList[A: ByteStringFormatter](
key: CacheKey,
value: Seq[A],
expireSeconds: Option[Long] = None
): Task[\/[EsError, Seq[A]]]
def scan[A: ByteStringFormatter](
matchGlob: Option[CacheKeyGlob]
): Task[\/[EsError, Seq[A]]]
def get[A: ByteStringFormatter](
key: CacheKey
): Task[\/[EsError, Option[A]]]
def getList[A: ByteStringFormatter](
key: CacheKey
): Task[\/[EsError, Seq[A]]]
def delete(key: CacheKey): Task[\/[EsError, Unit]]
def exists(key: CacheKey): Task[\/[EsError, Boolean]]
def clear: Task[\/[EsError, Unit]]
}
object RedisClient {
import io.circe.syntax._
def byteStringFormatter[A](implicit encoder: Encoder[A], decoder: Decoder[A]): ByteStringFormatter[A] =
new ByteStringFormatter[A] {
def serialize(data: A): ByteString = ByteString(data.asJson.noSpaces)
def deserialize(bs: ByteString): A = {
\/.fromEither(io.circe.parser.decode[A](bs.utf8String)) match {
case \/-(r) => r
case -\/(e) => throw e
}
}
implicit val redisReplyDeserializer = new RedisReplyDeserializer[A] {
override def deserialize: PartialFunction[RedisReply, A] = {
case Bulk(Some(bs)) => byteStringFormatter.deserialize(bs)
}
}
}
}
class RedisClientImpl @Inject()(
system: ActorSystem
) extends RedisClient {
protected val c: RediscalaClient =
RediscalaClient(
SharedAdapterConf.redis.master.host,
SharedAdapterConf.redis.master.port,
SharedAdapterConf.redis.master.password,
SharedAdapterConf.redis.master.dbNum
)(_system = system)
final def get[A: ByteStringFormatter](
key: CacheKey
): Task[\/[EsError, Option[A]]] = {
for {
r <- {
Task
.deferFuture(c.get(Tag.unwrap(key)))
.materialize
.map {
case S(Some(r)) => r.some.right
case S(None) => None.right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
final def getList[A: ByteStringFormatter](
key: CacheKey
): Task[\/[EsError, Seq[A]]] = {
for {
r <- {
Task
.deferFuture(c.lrange[A](Tag.unwrap(key), 0, -1))
.materialize
.map {
case S(xs) => xs.right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
final def scan[A: ByteStringFormatter](
matchGlob: Option[CacheKeyGlob]
): Task[\/[EsError, Seq[A]]] = {
for {
keySeqTask <- keys(matchGlob)
r <- {
keySeqTask match {
case \/-(keySeq) if keySeq.nonEmpty =>
Task
.deferFuture(c.mget(keySeq: _*))
.materialize
.map {
case S(scanResSeq) => scanResSeq.flatten.right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
case \/-(keySeq) if keySeq.isEmpty =>
Task.now(Nil.right)
case -\/(e) => Task.now(e.left)
}
}
} yield r
}
final def putList[A: ByteStringFormatter](
key: CacheKey,
values: Seq[A],
expireSeconds: Option[Long] = None
): Task[\/[EsError, Seq[A]]] = {
for {
r <- {
Task
.deferFuture(c.lpush(Tag.unwrap(key), values: _*))
.materialize
.map {
case S(_) => values.right
case F(throwable) =>
NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
_ <- {
Task
.deferFuture {
c.expire(Tag.unwrap(key), expireSeconds.get)
}
.materialize
.map {
case S(_) => ().right
case F(throwable) =>
NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
final def put[A: ByteStringFormatter](
key: CacheKey,
value: A,
expireSeconds: Option[Long] = None
): Task[\/[EsError, A]] = {
for {
r <- {
Task
.deferFuture(c.set(Tag.unwrap(key), value, expireSeconds))
.materialize
.map {
case S(isSuccessful) =>
if (isSuccessful) value.right[EsError]
else
throw new RuntimeException(
s"redis put command failure [key=$key,value=$value,expireSeconds=$expireSeconds]"
)
case F(throwable) =>
NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
// def bulkPut[A: ByteStringFormatter](
// keyValues: Map[CacheKey, A],
// dbNum: DBNum,
// expireSeconds: Option
// ) = {
// c.mset()
// }
final def delete(
key: CacheKey
): Task[\/[EsError, Unit]] = {
for {
r <- {
Task
.deferFuture(c.del(Tag.unwrap(key)))
.materialize
.map {
case S(_) => ().right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
final def exists(
key: CacheKey
): Task[\/[EsError, Boolean]] = {
for {
r <- {
Task
.deferFuture(c.exists(Tag.unwrap(key)))
.materialize
.map {
case S(r) => r.right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
final def clear: Task[\/[EsError, Unit]] = {
for {
r <- {
Task
.deferFuture(c.flushdb())
.materialize
.map {
case S(isSuccessful) =>
if (isSuccessful) ().right
else
throw new RuntimeException(
s"redis clear command failure [res=$isSuccessful]"
)
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
private def keys(
matchGlob: Option[CacheKeyGlob]
): Task[\/[EsError, Seq[String]]] = {
for {
r <- {
Task
.deferFuture(c.keys(matchGlob.map(Tag.unwrap).getOrElse("*")))
.materialize
.map {
case S(xs) => xs.right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment