Created
November 26, 2018 17:55
-
-
Save roberth/7f5fae2178e7574eb7dcdd0dc6e32246 to your computer and use it in GitHub Desktop.
Idea for stopping work gracefully
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Control.Concurrent.Extras.Gate where | |
import Protolude | |
import Data.IORef | |
import Control.Concurrent.STM | |
import Data.UUID | |
import qualified Data.UUID.V4 as UUID | |
import qualified Data.Map as M | |
-- | State of a multi-task process to help with graceful termination | |
data WorkGate description = WorkGate | |
{ noMoreWorkException :: IORef (Maybe SomeException) | |
, inProgress :: TVar (Map (ThreadId, UUID) description) | |
} | |
-- | Run a computation, unless the gate is shut, but otherwise keeping | |
-- track of this computation's execution, by means of the @description@. | |
gate :: WorkGate description -> description -> IO a -> IO a | |
-- I don't think @IO a@ needs to become aware of its key. If it does | |
-- we'll have @gate' :: bla -> ((ThreadId, UUID) -> IO a) -> IO a@. | |
gate workGate description io = do | |
acceptWork <- readIORef (noMoreWorkException workGate) | |
mapM_ throwIO acceptWork | |
let add :: IO (ThreadId, UUID) | |
add = do | |
key <- (,) <$> myThreadId <*> UUID.nextRandom | |
atomically $ do | |
modifyTVar (inProgress workGate) (M.insert key description) | |
pure key | |
remove :: (ThreadId, UUID) -> IO () | |
remove key = do | |
atomically $ do | |
modifyTVar (inProgress workGate) (M.insert key description) | |
bracket add remove $ const io | |
-- | Stop accepting work. Call 'await' after this. | |
shut :: WorkGate description | |
-> SomeException -- ^ Exception that will be thrown when attempting | |
-- to start more work. | |
-> IO () | |
shut workGate exc = do | |
atomicWriteIORef (noMoreWorkException workGate) (Just exc) | |
-- | Does not make sense to call before 'shut'. | |
await :: WorkGate description | |
-> (Map (ThreadId, UUID) description | |
-> IO ()) -- ^ Callback to report shutdown progress | |
-- if you like | |
-> IO () | |
await workGate progress = go =<< atomically (readTVar (inProgress workGate)) | |
where | |
go ip = | |
if M.null ip | |
then pass | |
else do | |
progress ip | |
go =<< atomically (do | |
ip' <- readTVar (inProgress workGate) | |
when (M.keys ip == M.keys ip') | |
retry | |
pure ip' | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment