-
-
Save dminuoso/0c01e714f1ccd0c5986ee6ed5af0c2fe to your computer and use it in GitHub Desktop.
hedis wrapper for Redis Sentinel support
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE DeriveDataTypeable, FlexibleContexts, MultiParamTypeClasses, NamedFieldPuns, | |
NoImplicitPrelude, OverloadedStrings, RankNTypes, ScopedTypeVariables, TupleSections, | |
ConstraintKinds, TemplateHaskell, StandaloneDeriving #-} | |
{-# OPTIONS_GHC -funbox-strict-fields -Wall -Werror #-} | |
-- dependencies: hedis retry safe data-default uuid monad-logger basic-prelude lifted-base either | |
-- | Wrapper for hedis to support redis-sentinel. It is | |
-- built-atop of, and re-exports most of, the "Database.Redis" module. | |
-- | |
-- Use runRedisRetry to have the commands retried automatically in case of failover. | |
-- | |
-- Note this does not support transactions currently. The Redis documentation recommends | |
-- the use of EVAL instead of transactions anyway, and that they eventually "may deprecate | |
-- and finally remove transactions" (<http://redis.io/topics/transactions>). | |
module Database.Redis.Sentinel ( | |
-- * Connection | |
connect | |
, Connection | |
, ConnectInfo (..) | |
-- * runRedis with retries and Sentinal support | |
, runRedis | |
, runRedisRetry | |
, runRedisRetryDefault | |
, defaultRetrySettings | |
-- * Publish/subscribe | |
, runPublish | |
, runPubSub | |
, MonadRedisSentinel | |
-- * Re-export Control.Retry | |
, module Control.Retry | |
-- * Re-export Database.Redis | |
, module Database.Redis | |
) where | |
import BasicPrelude | |
import Control.Concurrent.Lifted (threadDelay) | |
import Control.Concurrent.MVar.Lifted (MVar, modifyMVar, modifyMVar_, newMVar) | |
import Control.Exception.Lifted (Handler (Handler), catches) | |
import Control.Monad.Catch (Handler (Handler), MonadCatch) | |
import Control.Monad.Logger (MonadLogger, logWarnS) | |
import Control.Monad.Trans.Either (hoistEither, runEitherT) | |
import Control.Monad.Trans.Control (MonadBaseControl) | |
import Control.Retry (RetrySettings (..), limitedRetries, unlimitedRetries) | |
import qualified Control.Retry as Retry | |
import qualified Data.ByteString as BS | |
import Data.Default (def) | |
import qualified Data.Text as Text | |
import Data.Text.Encoding as Text | |
import qualified Data.Time as Time | |
import qualified Data.UUID as UUID | |
import qualified Data.UUID.V4 as UUID | |
import Database.Redis hiding (ConnectInfo, Connection, connect, | |
defaultConnectInfo, runRedis) | |
import qualified Database.Redis as Redis | |
import qualified Safe | |
-- | See 'publish'. This will retry for a limited time if the Redis server is unreachable. | |
runPublish :: MonadRedisSentinel m | |
=> Connection -- ^ Redis connection | |
-> ByteString -- ^ Channel | |
-> ByteString -- ^ Message | |
-> m (Either Reply Integer) | |
runPublish conn channel message = | |
-- We don't retry as quickly here in order to give subscribers a chance to reconnect first | |
--TODO: would be better to retry at normal speed, but have a delay | |
--after reconnection is successful | |
runRedisRetry defaultRetrySettings{numRetries = limitedRetries 5, baseDelay = 3200} conn $ | |
publish channel message | |
-- | See 'pubSub'. This will retry forever if the Redis server is unreachable. | |
runPubSub :: MonadRedisSentinel m | |
=> Connection | |
-- ^ Redis connection | |
-> PubSub | |
-- ^ Initial subscriptions | |
-> (Message -> IO PubSub) | |
-- ^ Callback function | |
-> m () | |
runPubSub conn initial callback = | |
go initRetryDelay | |
where | |
retry delay startTime e = do | |
curTime <- liftIO $ Time.getCurrentTime | |
$(logWarnS) "Database.Redis.Sentinel" $ "runPubSub: " ++ e | |
delayGo $ if round ((curTime `Time.diffUTCTime` startTime) * 1000000) >= delay | |
then initRetryDelay | |
else if delay > maxRetryDelay then maxRetryDelay else delay | |
delayGo delay = do | |
threadDelay delay | |
go (delay * 2) | |
go retryDelay = do | |
startTime <- liftIO $ Time.getCurrentTime | |
void (runRedis conn (pubSub initial callback >> return (Right ()))) | |
`catchRedis` retry retryDelay startTime | |
return () | |
initRetryDelay = 50000 | |
maxRetryDelay = 1000000 | |
-- | Interact with a Redis datastore, and retry the action using default retry | |
-- settings (see 'defaultRetrySettings') if the Redis server is or becomes | |
-- unreachable. See 'runRedisRetry' for more details. | |
runRedisRetryDefault :: MonadRedisSentinel m | |
=> forall a. Connection | |
-> Redis (Either Reply a) | |
-> m (Either Reply a) | |
runRedisRetryDefault = runRedisRetry defaultRetrySettings | |
-- | Interact with a Redis datastore, and retry the action using the specified | |
-- retry settings if the Redis server is or becomes unreachable. See | |
-- 'runRedis' and 'RetrySettings' for details about the arguments. | |
runRedisRetry :: MonadRedisSentinel m | |
=> forall a. RetrySettings | |
-> Connection | |
-> Redis (Either Reply a) | |
-> m (Either Reply a) | |
runRedisRetry retrySettings conn action = do | |
Retry.recovering retrySettings | |
[ Control.Monad.Catch.Handler $ \(ex::IOException) -> retry (show ex) | |
, Control.Monad.Catch.Handler $ \(ex::ConnectionLostException) -> retry (show ex) ] $ | |
runRedis conn action | |
where | |
retry ex = do | |
$(logWarnS) "Database.Redis.Sentinel" $ "runRedisRetry: " ++ ex | |
return True | |
-- | Interact with a Redis datastore. See 'Database.Redis.runRedis' for details. | |
-- | |
-- If using redis-sentinel, this will attempt to retrieve the current master and connect to it if | |
-- the previous runRedis failed in a way that may indicate a problem with the server. | |
runRedis :: MonadRedisSentinel m | |
=> Connection | |
-> Redis (Either Reply a) | |
-> m (Either Reply a) | |
runRedis (Connection connMVar) action = do | |
-- Failover approach based on http://redis.io/topics/sentinel-clients. | |
-- May want to revisit based on whether consensus is reached on this thread: | |
-- http://grokbase.com/t/gg/redis-db/1353dnqesp/redis-sentinel-lets-improve-clients-detection-of-failover-events/oldest | |
(baseConn, preToken) <- modifyMVar connMVar $ \oldConnection@Connection' | |
{ rcCheckFailover | |
, rcToken = oldToken | |
, rcConnectInfo = oldConnectInfo@ConnectInfo{connectBase = oldBaseConnectInfo} | |
, rcBaseConnection = oldBaseConnection } -> | |
if rcCheckFailover | |
then do | |
newConnectInfo@ConnectInfo{connectBase=newBaseConnectInfo} <- | |
updateMaster oldConnectInfo | |
newToken <- liftIO $ UUID.nextRandom | |
if connectHost newBaseConnectInfo /= connectHost oldBaseConnectInfo || | |
connectPort newBaseConnectInfo /= connectPort oldBaseConnectInfo | |
then do | |
$(logWarnS) "Database.Redis.Sentinel" $ "runRedis: switching " ++ | |
"to new master: " ++ Text.pack (connectHost newBaseConnectInfo) ++ | |
", " ++ show (connectPort newBaseConnectInfo) | |
newConn <- liftIO $ Redis.connect (connectBase newConnectInfo) | |
return ( Connection' False newToken newConnectInfo newConn | |
, (newConn, newToken) ) | |
else return ( Connection' False newToken newConnectInfo oldBaseConnection | |
, (oldBaseConnection, newToken) ) | |
else return (oldConnection, (oldBaseConnection, oldToken)) | |
liftIO $ do | |
reply <- seqRunRedis baseConn | |
`catchRedisRethrow` (\_ -> do | |
setCheckSentinel preToken | |
return $ Left $ Integer 0) | |
case reply of | |
Left (Error e)| "READONLY " `BS.isPrefixOf` e -> | |
-- This means our connection has turned into a slave | |
setCheckSentinel preToken | |
_ -> return () | |
return reply | |
where | |
seqRunRedis conn = do | |
-- Use `seq` so that we can catch ConnectionLostException | |
r <- Redis.runRedis conn action | |
seq r $ return r | |
setCheckSentinel preToken = | |
modifyMVar_ connMVar $ \conn@Connection'{rcToken} -> | |
if preToken == rcToken | |
then do | |
newToken <- UUID.nextRandom | |
return (conn{rcToken = newToken, rcCheckFailover = True}) | |
else return conn | |
-- | Opens a 'Connection' to a Redis server designated by the given 'ConnectInfo.' | |
connect :: MonadRedisSentinel m | |
=> ConnectInfo | |
-> m Connection | |
connect origConnectInfo = do | |
connectInfo@ConnectInfo{connectBase} <- updateMaster origConnectInfo | |
connMVar <- liftIO $ do | |
conn <- Redis.connect connectBase | |
token <- UUID.nextRandom | |
newMVar $ Connection' False token connectInfo conn | |
return $ Connection connMVar | |
updateMaster :: MonadRedisSentinel m | |
=> ConnectInfo | |
-> m ConnectInfo | |
updateMaster connectInfo@(ConnectInfo [] _ _) = return connectInfo | |
updateMaster connectInfo@(ConnectInfo sentinels masterName baseConnectInfo) = do | |
-- This is using the Either monad "backwards" -- Left means stop because we've made a connection, | |
-- Right means try again. | |
resultEither <- runEitherT $ forM_ sentinels $ \sentinelPair -> do | |
hoistEither =<< lift (trySentinel sentinelPair `catchRedis` sentinelFailed sentinelPair) | |
case resultEither of | |
Left result -> return result | |
Right () -> throwIO $ SentinelException $ "Database.Redis.Sentinel.updateMaster: unable to get master '" ++ | |
masterName ++ "' from sentinels " ++ show sentinels ++ "." | |
where | |
trySentinel :: MonadRedisSentinel m => (HostName, PortID) -> m (Either ConnectInfo ()) | |
trySentinel sentinelPair@(sentinelHost, sentinelPort) = do | |
sentinelConn <- liftIO $ Redis.connect $ Redis.defaultConnectInfo | |
{ connectHost = sentinelHost | |
, connectPort = sentinelPort | |
, connectMaxConnections = 1 } | |
replyE <- liftIO $ Redis.runRedis sentinelConn $ sendRequest | |
["SENTINEL", "get-master-addr-by-name", Text.encodeUtf8 masterName] | |
seq replyE $ return () -- Need seq so ConnectionLostException catchable | |
case replyE of | |
Right [host, port] -> | |
return $ Left connectInfo | |
{ connectSentinels = sentinelPair : delete sentinelPair sentinels | |
, connectBase = baseConnectInfo | |
{ connectHost = Text.unpack $ Text.decodeUtf8 host | |
, connectPort = PortNumber $ fromInteger $ fromMaybe 26379 $ | |
Safe.readMay $ Text.unpack $ Text.decodeUtf8 port } } | |
v -> do | |
$(logWarnS) "Database.Redis.Sentinel" $ "updateMaster: invalid response from sentinel " ++ | |
show sentinelPair ++ ": " ++ show v | |
return $ Right () | |
sentinelFailed :: MonadRedisSentinel m => (HostName, PortID) -> Text -> m (Either ConnectInfo ()) | |
sentinelFailed sentinelPair showEx = do | |
$(logWarnS) "Database.Redis.Sentinel" $ "updateMaster: exception getting master from sentinel " ++ | |
show sentinelPair ++ ": " ++ showEx | |
return $ Right () | |
catchRedisRethrow :: MonadBaseControl IO m | |
=> m a -> (Text -> m a) -> m a | |
catchRedisRethrow action handler = | |
action `catches` | |
[ Control.Exception.Lifted.Handler $ \(ex::IOException) -> handler (show ex) >> throwIO ex | |
, Control.Exception.Lifted.Handler $ \(ex::ConnectionLostException) -> handler (show ex) >> throwIO ex ] | |
catchRedis :: MonadBaseControl IO m | |
=> m a -> (Text -> m a) -> m a | |
catchRedis action handler = | |
action `catches` | |
[ Control.Exception.Lifted.Handler $ \(ex::IOException) -> handler (show ex) | |
, Control.Exception.Lifted.Handler $ \(ex::ConnectionLostException) -> handler (show ex) ] | |
-- | A threadsafe pool of network connections to a Redis server. Use the 'connect' function to create one. | |
newtype Connection = Connection (MVar Connection') | |
data Connection' = Connection' | |
{ rcCheckFailover :: Bool | |
, rcToken :: UUID.UUID | |
, rcConnectInfo :: ConnectInfo | |
, rcBaseConnection :: Redis.Connection } | |
-- | Information for connnecting to a Redis server. | |
-- | |
-- It is recommended to not use the constructor directly. Instead use getConnectInfo and update it with record syntax. | |
data ConnectInfo = ConnectInfo | |
{ connectSentinels :: [(HostName, PortID)] | |
-- ^ List of sentinels. If empty, sentinels will not be used | |
, connectName :: Text | |
-- ^ Name of master to connect to (only relevant if using sentinels) | |
, connectBase :: Redis.ConnectInfo | |
-- ^ Connection info for base library. The host/port are only used if not using sentinels. | |
} | |
-- | Default retry settings for 'runRedisRetry'. This will retry for | |
-- approximately one minute, using exponential backoff. | |
defaultRetrySettings :: RetrySettings | |
defaultRetrySettings = def{numRetries = limitedRetries 11} | |
-- | Exception thrown by "Database.Redis.Sentinel". | |
data RedisSentinelException | |
= SentinelException Text | |
-- ^ Thrown if no sentinel can be reached. | |
deriving (Show, Typeable) | |
instance Exception RedisSentinelException | |
type MonadRedisSentinel m = (MonadIO m, MonadLogger m, MonadBaseControl IO m, Control.Monad.Catch.MonadCatch m) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment