Skip to content

Instantly share code, notes, and snippets.

@machuz
Created July 31, 2018 05:33
Show Gist options
  • Save machuz/230f17538f6790d7a6afbc430e3db96b to your computer and use it in GitHub Desktop.
Save machuz/230f17538f6790d7a6afbc430e3db96b to your computer and use it in GitHub Desktop.
redis実装パターン2(transactionを利用して動的にDB番号を切り替える)
// ↓のようにもかけるがテストしづらいのでやめた
//val t = c.multi( redis => {
// redis.select(Tag.unwrap(dbNum))
// redis.get(Tag.unwrap(key)
//})
// 動的DB変更案はトランザクションにロールバックがないredisで、
// 複数コマンド叩かないと整合性が取れない状態を生みやすいので厳しいかも
sealed abstract class RedisClient {
def put[A: ByteStringFormatter](
key: CacheKey,
value: A,
dbNum: DBNum,
expireSeconds: Option[Long] = None
): Task[\/[EsError, A]]
def putList[A: ByteStringFormatter](
key: CacheKey,
value: Seq[A],
dbNum: DBNum,
expireSeconds: Option[Long] = None
): Task[\/[EsError, Seq[A]]]
def scan[A: ByteStringFormatter](
matchGlob: Option[CacheKeyGlob],
dbNum: DBNum,
cursor: Int = 0,
count: Option[Int]
): Task[\/[EsError, Seq[A]]]
def get[A: ByteStringFormatter](
key: CacheKey,
dbNum: DBNum
): Task[\/[EsError, Option[A]]]
def getList[A: ByteStringFormatter](
key: CacheKey,
dbNum: DBNum
): Task[\/[EsError, Seq[A]]]
def delete(key: CacheKey, dbNum: DBNum): Task[\/[EsError, Unit]]
def exists(key: CacheKey, dbNum: DBNum): Task[\/[EsError, Boolean]]
def clear(dbNum: DBNum): Task[\/[EsError, Unit]]
}
object RedisClient {
import io.circe.syntax._
def byteStringFormatter[A](implicit encoder: Encoder[A], decoder: Decoder[A]): ByteStringFormatter[A] =
new ByteStringFormatter[A] {
def serialize(data: A): ByteString = ByteString(data.asJson.noSpaces)
def deserialize(bs: ByteString): A = {
\/.fromEither(io.circe.parser.decode[A](bs.utf8String)) match {
case \/-(r) => r
case -\/(e) => throw e
}
}
implicit val redisReplyDeserializer = new RedisReplyDeserializer[A] {
override def deserialize: PartialFunction[RedisReply, A] = {
case Bulk(Some(bs)) => byteStringFormatter.deserialize(bs)
}
}
}
}
class RedisClientImpl @Inject()(
system: ActorSystem
) extends RedisClient {
protected val c =
RediscalaClient(
SharedAdapterConf.redis.master.host,
SharedAdapterConf.redis.master.port,
SharedAdapterConf.redis.master.password
)(_system = system)
final def get[A: ByteStringFormatter](
key: CacheKey,
dbNum: DBNum
): Task[\/[EsError, Option[A]]] = {
for {
r <- {
Task
.deferFuture {
val t = c.multi()
t.select(Tag.unwrap(dbNum))
t.get(Tag.unwrap(key))
t.exec()
}
.materialize
.map {
case S(MultiBulk(Some(Vector(selectRes, getRes)))) if selectRes.toByteString.utf8String == "OK" =>
val fmt = implicitly[ByteStringFormatter[A]]
fmt.deserialize(getRes.toByteString).some.right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
final def getList[A: ByteStringFormatter](
key: CacheKey,
dbNum: DBNum
): Task[\/[EsError, Seq[A]]] = {
for {
r <- {
Task
.deferFuture {
val t = c.multi()
t.select(Tag.unwrap(dbNum))
t.lrange[A](Tag.unwrap(key), 0, 9999)
t.exec()
}
.materialize
.map {
case S(MultiBulk(Some(Vector(selectRes, MultiBulk(Some(range))))))
if selectRes.toByteString.utf8String == "OK" =>
val fmt = implicitly[ByteStringFormatter[A]]
range.flatMap {
case x: Bulk => x.response.map(fmt.deserialize)
case _ => Nil
}.right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
final def scan[A: ByteStringFormatter](
matchGlob: Option[CacheKeyGlob],
dbNum: DBNum,
cursor: Int = 0,
count: Option[Int] = None
): Task[\/[EsError, Seq[A]]] = {
for {
keySeqTask <- keys(matchGlob, dbNum)
r <- {
keySeqTask match {
case \/-(keySeq) if keySeq.nonEmpty =>
Task
.deferFuture {
val t = c.multi()
t.select(Tag.unwrap(dbNum))
t.mget(keySeq: _*)
t.exec()
}
.materialize
.map {
case S(MultiBulk(Some(Vector(selectRes, MultiBulk(Some(scanResSeq))))))
if selectRes.toByteString.utf8String == "OK" =>
val fmt = implicitly[ByteStringFormatter[A]]
scanResSeq.flatMap {
case x: Bulk => x.response.map(fmt.deserialize)
case _ => Nil
}.right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
case \/-(keySeq) if keySeq.isEmpty =>
Task.now(Nil.right)
case -\/(e) => Task.now(e.left)
}
}
} yield r
}
final def putList[A: ByteStringFormatter](
key: CacheKey,
values: Seq[A],
dbNum: DBNum,
expireSeconds: Option[Long] = None
): Task[\/[EsError, Seq[A]]] = {
for {
r <- {
Task
.deferFuture {
val t = c.multi()
t.select(Tag.unwrap(dbNum))
t.lpush(Tag.unwrap(key), values: _*)
if (expireSeconds.isDefined) t.expire(Tag.unwrap(key), expireSeconds.get)
else Future.successful(true)
t.exec()
}
.materialize
.map {
case S(MultiBulk(Some(Vector(selectRes, _)))) if selectRes.toByteString.utf8String == "OK" =>
values.right
case S(MultiBulk(Some(Vector(selectRes, _, _)))) if selectRes.toByteString.utf8String == "OK" =>
values.right
case S(res) =>
throw new RuntimeException(res.toString)
case F(throwable) =>
NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
final def put[A: ByteStringFormatter](
key: CacheKey,
value: A,
dbNum: DBNum,
expireSeconds: Option[Long] = None
): Task[\/[EsError, A]] = {
for {
r <- {
Task
.deferFuture {
val t = c.multi()
t.select(Tag.unwrap(dbNum))
t.set(Tag.unwrap(key), value, expireSeconds)
t.exec()
}
.materialize
.map {
case S(MultiBulk(Some(Vector(selectRes, putRes))))
if Seq(selectRes, putRes).map(_.toByteString.utf8String).forall(_ == "OK") =>
value.right
case S(_) =>
throw new RuntimeException("")
case F(throwable) =>
NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
// def bulkPut[A: ByteStringFormatter](
// keyValues: Map[CacheKey, A],
// dbNum: DBNum,
// expireSeconds: Option
// ) = {
// c.mset()
// }
final def delete(
key: CacheKey,
dbNum: DBNum
): Task[\/[EsError, Unit]] = {
for {
r <- {
Task
.deferFuture {
val t = c.multi()
t.select(Tag.unwrap(dbNum))
t.del(Tag.unwrap(key))
t.exec()
}
.materialize
.map {
case S(MultiBulk(Some(Vector(selectRes, delRes)))) if selectRes.toByteString.utf8String == "OK" =>
if (delRes.toByteString.utf8String == "1") ().right
else {
EsLogger.warn("redis.delete", Map("msg" -> s"key is not found [$key]"))
().right
}
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
final def exists(
key: CacheKey,
dbNum: DBNum
): Task[\/[EsError, Boolean]] = {
for {
r <- {
Task
.deferFuture {
val t = c.multi()
t.select(Tag.unwrap(dbNum))
t.exists(Tag.unwrap(key))
t.exec()
}
.materialize
.map {
case S(MultiBulk(Some(Vector(selectRes, existsRes)))) if selectRes.toByteString.utf8String == "OK" =>
if (existsRes.toByteString.utf8String == "1") true.right
else false.right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
final def clear(dbNum: DBNum): Task[\/[EsError, Unit]] = {
for {
r <- {
Task
.deferFuture {
val t = c.multi()
t.select(Tag.unwrap(dbNum))
t.flushdb()
t.exec()
}
.materialize
.map {
case S(MultiBulk(Some(Vector(selectRes, clearRes))))
if Seq(selectRes, clearRes).map(_.toByteString.utf8String).forall(_ == "OK") =>
().right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
private def keys(
matchGlob: Option[CacheKeyGlob],
dbNum: DBNum
): Task[\/[EsError, Vector[String]]] = {
for {
r <- {
Task
.deferFuture {
val t = c.multi()
t.select(Tag.unwrap(dbNum))
t.keys(matchGlob.map(Tag.unwrap).getOrElse("*"))
t.exec()
}
.materialize
.map {
case S(MultiBulk(Some(Vector(selectRes, MultiBulk(Some(keysRes))))))
if selectRes.toByteString.utf8String == "OK" =>
keysRes.map(_.toString).right
case F(throwable) => NonFatalError(throwable, ErrorCode.REDIS_REQUEST_ERROR).left
}
}
} yield r
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment