Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
hedis wrapper for Redis Sentinel support
{-# LANGUAGE DeriveDataTypeable, FlexibleContexts, MultiParamTypeClasses, NamedFieldPuns,
NoImplicitPrelude, OverloadedStrings, RankNTypes, ScopedTypeVariables, TupleSections,
ConstraintKinds, TemplateHaskell, StandaloneDeriving #-}
{-# OPTIONS_GHC -funbox-strict-fields -Wall -Werror #-}
-- dependencies: hedis retry safe data-default uuid monad-logger basic-prelude lifted-base either
-- | Wrapper for hedis to support redis-sentinel. It is
-- built-atop of, and re-exports most of, the "Database.Redis" module.
--
-- Use runRedisRetry to have the commands retried automatically in case of failover.
--
-- Note this does not support transactions currently. The Redis documentation recommends
-- the use of EVAL instead of transactions anyway, and that they eventually "may deprecate
-- and finally remove transactions" (<http://redis.io/topics/transactions>).
module Database.Redis.Sentinel (
-- * Connection
connect
, Connection
, ConnectInfo (..)
-- * runRedis with retries and Sentinal support
, runRedis
, runRedisRetry
, runRedisRetryDefault
, defaultRetrySettings
-- * Publish/subscribe
, runPublish
, runPubSub
, MonadRedisSentinel
-- * Re-export Control.Retry
, module Control.Retry
-- * Re-export Database.Redis
, module Database.Redis
) where
import BasicPrelude
import Control.Concurrent.Lifted (threadDelay)
import Control.Concurrent.MVar.Lifted (MVar, modifyMVar, modifyMVar_, newMVar)
import Control.Exception.Lifted (Handler (Handler), catches)
import Control.Monad.Catch (Handler (Handler), MonadCatch)
import Control.Monad.Logger (MonadLogger, logWarnS)
import Control.Monad.Trans.Either (hoistEither, runEitherT)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Retry (RetrySettings (..), limitedRetries, unlimitedRetries)
import qualified Control.Retry as Retry
import qualified Data.ByteString as BS
import Data.Default (def)
import qualified Data.Text as Text
import Data.Text.Encoding as Text
import qualified Data.Time as Time
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import Database.Redis hiding (ConnectInfo, Connection, connect,
defaultConnectInfo, runRedis)
import qualified Database.Redis as Redis
import qualified Safe
-- | See 'publish'. This will retry for a limited time if the Redis server is unreachable.
runPublish :: MonadRedisSentinel m
=> Connection -- ^ Redis connection
-> ByteString -- ^ Channel
-> ByteString -- ^ Message
-> m (Either Reply Integer)
runPublish conn channel message =
-- We don't retry as quickly here in order to give subscribers a chance to reconnect first
--TODO: would be better to retry at normal speed, but have a delay
--after reconnection is successful
runRedisRetry defaultRetrySettings{numRetries = limitedRetries 5, baseDelay = 3200} conn $
publish channel message
-- | See 'pubSub'. This will retry forever if the Redis server is unreachable.
runPubSub :: MonadRedisSentinel m
=> Connection
-- ^ Redis connection
-> PubSub
-- ^ Initial subscriptions
-> (Message -> IO PubSub)
-- ^ Callback function
-> m ()
runPubSub conn initial callback =
go initRetryDelay
where
retry delay startTime e = do
curTime <- liftIO $ Time.getCurrentTime
$(logWarnS) "Database.Redis.Sentinel" $ "runPubSub: " ++ e
delayGo $ if round ((curTime `Time.diffUTCTime` startTime) * 1000000) >= delay
then initRetryDelay
else if delay > maxRetryDelay then maxRetryDelay else delay
delayGo delay = do
threadDelay delay
go (delay * 2)
go retryDelay = do
startTime <- liftIO $ Time.getCurrentTime
void (runRedis conn (pubSub initial callback >> return (Right ())))
`catchRedis` retry retryDelay startTime
return ()
initRetryDelay = 50000
maxRetryDelay = 1000000
-- | Interact with a Redis datastore, and retry the action using default retry
-- settings (see 'defaultRetrySettings') if the Redis server is or becomes
-- unreachable. See 'runRedisRetry' for more details.
runRedisRetryDefault :: MonadRedisSentinel m
=> forall a. Connection
-> Redis (Either Reply a)
-> m (Either Reply a)
runRedisRetryDefault = runRedisRetry defaultRetrySettings
-- | Interact with a Redis datastore, and retry the action using the specified
-- retry settings if the Redis server is or becomes unreachable. See
-- 'runRedis' and 'RetrySettings' for details about the arguments.
runRedisRetry :: MonadRedisSentinel m
=> forall a. RetrySettings
-> Connection
-> Redis (Either Reply a)
-> m (Either Reply a)
runRedisRetry retrySettings conn action = do
Retry.recovering retrySettings
[ Control.Monad.Catch.Handler $ \(ex::IOException) -> retry (show ex)
, Control.Monad.Catch.Handler $ \(ex::ConnectionLostException) -> retry (show ex) ] $
runRedis conn action
where
retry ex = do
$(logWarnS) "Database.Redis.Sentinel" $ "runRedisRetry: " ++ ex
return True
-- | Interact with a Redis datastore. See 'Database.Redis.runRedis' for details.
--
-- If using redis-sentinel, this will attempt to retrieve the current master and connect to it if
-- the previous runRedis failed in a way that may indicate a problem with the server.
runRedis :: MonadRedisSentinel m
=> Connection
-> Redis (Either Reply a)
-> m (Either Reply a)
runRedis (Connection connMVar) action = do
-- Failover approach based on http://redis.io/topics/sentinel-clients.
-- May want to revisit based on whether consensus is reached on this thread:
-- http://grokbase.com/t/gg/redis-db/1353dnqesp/redis-sentinel-lets-improve-clients-detection-of-failover-events/oldest
(baseConn, preToken) <- modifyMVar connMVar $ \oldConnection@Connection'
{ rcCheckFailover
, rcToken = oldToken
, rcConnectInfo = oldConnectInfo@ConnectInfo{connectBase = oldBaseConnectInfo}
, rcBaseConnection = oldBaseConnection } ->
if rcCheckFailover
then do
newConnectInfo@ConnectInfo{connectBase=newBaseConnectInfo} <-
updateMaster oldConnectInfo
newToken <- liftIO $ UUID.nextRandom
if connectHost newBaseConnectInfo /= connectHost oldBaseConnectInfo ||
connectPort newBaseConnectInfo /= connectPort oldBaseConnectInfo
then do
$(logWarnS) "Database.Redis.Sentinel" $ "runRedis: switching " ++
"to new master: " ++ Text.pack (connectHost newBaseConnectInfo) ++
", " ++ show (connectPort newBaseConnectInfo)
newConn <- liftIO $ Redis.connect (connectBase newConnectInfo)
return ( Connection' False newToken newConnectInfo newConn
, (newConn, newToken) )
else return ( Connection' False newToken newConnectInfo oldBaseConnection
, (oldBaseConnection, newToken) )
else return (oldConnection, (oldBaseConnection, oldToken))
liftIO $ do
reply <- seqRunRedis baseConn
`catchRedisRethrow` (\_ -> do
setCheckSentinel preToken
return $ Left $ Integer 0)
case reply of
Left (Error e)| "READONLY " `BS.isPrefixOf` e ->
-- This means our connection has turned into a slave
setCheckSentinel preToken
_ -> return ()
return reply
where
seqRunRedis conn = do
-- Use `seq` so that we can catch ConnectionLostException
r <- Redis.runRedis conn action
seq r $ return r
setCheckSentinel preToken =
modifyMVar_ connMVar $ \conn@Connection'{rcToken} ->
if preToken == rcToken
then do
newToken <- UUID.nextRandom
return (conn{rcToken = newToken, rcCheckFailover = True})
else return conn
-- | Opens a 'Connection' to a Redis server designated by the given 'ConnectInfo.'
connect :: MonadRedisSentinel m
=> ConnectInfo
-> m Connection
connect origConnectInfo = do
connectInfo@ConnectInfo{connectBase} <- updateMaster origConnectInfo
connMVar <- liftIO $ do
conn <- Redis.connect connectBase
token <- UUID.nextRandom
newMVar $ Connection' False token connectInfo conn
return $ Connection connMVar
updateMaster :: MonadRedisSentinel m
=> ConnectInfo
-> m ConnectInfo
updateMaster connectInfo@(ConnectInfo [] _ _) = return connectInfo
updateMaster connectInfo@(ConnectInfo sentinels masterName baseConnectInfo) = do
-- This is using the Either monad "backwards" -- Left means stop because we've made a connection,
-- Right means try again.
resultEither <- runEitherT $ forM_ sentinels $ \sentinelPair -> do
hoistEither =<< lift (trySentinel sentinelPair `catchRedis` sentinelFailed sentinelPair)
case resultEither of
Left result -> return result
Right () -> throwIO $ SentinelException $ "Database.Redis.Sentinel.updateMaster: unable to get master '" ++
masterName ++ "' from sentinels " ++ show sentinels ++ "."
where
trySentinel :: MonadRedisSentinel m => (HostName, PortID) -> m (Either ConnectInfo ())
trySentinel sentinelPair@(sentinelHost, sentinelPort) = do
sentinelConn <- liftIO $ Redis.connect $ Redis.defaultConnectInfo
{ connectHost = sentinelHost
, connectPort = sentinelPort
, connectMaxConnections = 1 }
replyE <- liftIO $ Redis.runRedis sentinelConn $ sendRequest
["SENTINEL", "get-master-addr-by-name", Text.encodeUtf8 masterName]
seq replyE $ return () -- Need seq so ConnectionLostException catchable
case replyE of
Right [host, port] ->
return $ Left connectInfo
{ connectSentinels = sentinelPair : delete sentinelPair sentinels
, connectBase = baseConnectInfo
{ connectHost = Text.unpack $ Text.decodeUtf8 host
, connectPort = PortNumber $ fromInteger $ fromMaybe 26379 $
Safe.readMay $ Text.unpack $ Text.decodeUtf8 port } }
v -> do
$(logWarnS) "Database.Redis.Sentinel" $ "updateMaster: invalid response from sentinel " ++
show sentinelPair ++ ": " ++ show v
return $ Right ()
sentinelFailed :: MonadRedisSentinel m => (HostName, PortID) -> Text -> m (Either ConnectInfo ())
sentinelFailed sentinelPair showEx = do
$(logWarnS) "Database.Redis.Sentinel" $ "updateMaster: exception getting master from sentinel " ++
show sentinelPair ++ ": " ++ showEx
return $ Right ()
catchRedisRethrow :: MonadBaseControl IO m
=> m a -> (Text -> m a) -> m a
catchRedisRethrow action handler =
action `catches`
[ Control.Exception.Lifted.Handler $ \(ex::IOException) -> handler (show ex) >> throwIO ex
, Control.Exception.Lifted.Handler $ \(ex::ConnectionLostException) -> handler (show ex) >> throwIO ex ]
catchRedis :: MonadBaseControl IO m
=> m a -> (Text -> m a) -> m a
catchRedis action handler =
action `catches`
[ Control.Exception.Lifted.Handler $ \(ex::IOException) -> handler (show ex)
, Control.Exception.Lifted.Handler $ \(ex::ConnectionLostException) -> handler (show ex) ]
-- | A threadsafe pool of network connections to a Redis server. Use the 'connect' function to create one.
newtype Connection = Connection (MVar Connection')
data Connection' = Connection'
{ rcCheckFailover :: Bool
, rcToken :: UUID.UUID
, rcConnectInfo :: ConnectInfo
, rcBaseConnection :: Redis.Connection }
-- | Information for connnecting to a Redis server.
--
-- It is recommended to not use the constructor directly. Instead use getConnectInfo and update it with record syntax.
data ConnectInfo = ConnectInfo
{ connectSentinels :: [(HostName, PortID)]
-- ^ List of sentinels. If empty, sentinels will not be used
, connectName :: Text
-- ^ Name of master to connect to (only relevant if using sentinels)
, connectBase :: Redis.ConnectInfo
-- ^ Connection info for base library. The host/port are only used if not using sentinels.
}
-- | Default retry settings for 'runRedisRetry'. This will retry for
-- approximately one minute, using exponential backoff.
defaultRetrySettings :: RetrySettings
defaultRetrySettings = def{numRetries = limitedRetries 11}
-- | Exception thrown by "Database.Redis.Sentinel".
data RedisSentinelException
= SentinelException Text
-- ^ Thrown if no sentinel can be reached.
deriving (Show, Typeable)
instance Exception RedisSentinelException
type MonadRedisSentinel m = (MonadIO m, MonadLogger m, MonadBaseControl IO m, Control.Monad.Catch.MonadCatch m)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.