Created November 26, 2018 17:55
Idea for stopping work gracefully
module Control.Concurrent.Extras.Gate where
import Protolude
import Data.IORef
import Control.Concurrent.STM
import Data.UUID
import qualified Data.UUID.V4 as UUID
import qualified Data.Map as M
-- | State of a multi-task process to help with graceful termination
data WorkGate description = WorkGate
{ noMoreWorkException :: IORef (Maybe SomeException)
, inProgress :: TVar (Map (ThreadId, UUID) description)
-- | Run a computation, unless the gate is shut, but otherwise keeping
-- track of this computation's execution, by means of the @description@.
gate :: WorkGate description -> description -> IO a -> IO a
-- I don't think @IO a@ needs to become aware of its key. If it does
-- we'll have @gate' :: bla -> ((ThreadId, UUID) -> IO a) -> IO a@.
gate workGate description io = do
acceptWork <- readIORef (noMoreWorkException workGate)
mapM_ throwIO acceptWork
let add :: IO (ThreadId, UUID)
add = do
key <- (,) <$> myThreadId <*> UUID.nextRandom
atomically $ do
modifyTVar (inProgress workGate) (M.insert key description)
pure key
remove :: (ThreadId, UUID) -> IO ()
remove key = do
atomically $ do
modifyTVar (inProgress workGate) (M.insert key description)
bracket add remove $ const io
-- | Stop accepting work. Call 'await' after this.
shut :: WorkGate description
-> SomeException -- ^ Exception that will be thrown when attempting
-- to start more work.
-> IO ()
shut workGate exc = do
atomicWriteIORef (noMoreWorkException workGate) (Just exc)
-- | Does not make sense to call before 'shut'.
await :: WorkGate description
-> (Map (ThreadId, UUID) description
-> IO ()) -- ^ Callback to report shutdown progress
-- if you like
-> IO ()
await workGate progress = go =<< atomically (readTVar (inProgress workGate))
go ip =
if M.null ip
then pass
else do
progress ip
go =<< atomically (do
ip' <- readTVar (inProgress workGate)
when (M.keys ip == M.keys ip')
pure ip'
