Skip to content

Instantly share code, notes, and snippets.

@roberth
Created November 26, 2018 17:55
Show Gist options
  • Save roberth/7f5fae2178e7574eb7dcdd0dc6e32246 to your computer and use it in GitHub Desktop.
Save roberth/7f5fae2178e7574eb7dcdd0dc6e32246 to your computer and use it in GitHub Desktop.
Idea for stopping work gracefully
module Control.Concurrent.Extras.Gate where
import Protolude
import Data.IORef
import Control.Concurrent.STM
import Data.UUID
import qualified Data.UUID.V4 as UUID
import qualified Data.Map as M
-- | State of a multi-task process to help with graceful termination
data WorkGate description = WorkGate
{ noMoreWorkException :: IORef (Maybe SomeException)
, inProgress :: TVar (Map (ThreadId, UUID) description)
}
-- | Run a computation, unless the gate is shut, but otherwise keeping
-- track of this computation's execution, by means of the @description@.
gate :: WorkGate description -> description -> IO a -> IO a
-- I don't think @IO a@ needs to become aware of its key. If it does
-- we'll have @gate' :: bla -> ((ThreadId, UUID) -> IO a) -> IO a@.
gate workGate description io = do
acceptWork <- readIORef (noMoreWorkException workGate)
mapM_ throwIO acceptWork
let add :: IO (ThreadId, UUID)
add = do
key <- (,) <$> myThreadId <*> UUID.nextRandom
atomically $ do
modifyTVar (inProgress workGate) (M.insert key description)
pure key
remove :: (ThreadId, UUID) -> IO ()
remove key = do
atomically $ do
modifyTVar (inProgress workGate) (M.insert key description)
bracket add remove $ const io
-- | Stop accepting work. Call 'await' after this.
shut :: WorkGate description
-> SomeException -- ^ Exception that will be thrown when attempting
-- to start more work.
-> IO ()
shut workGate exc = do
atomicWriteIORef (noMoreWorkException workGate) (Just exc)
-- | Does not make sense to call before 'shut'.
await :: WorkGate description
-> (Map (ThreadId, UUID) description
-> IO ()) -- ^ Callback to report shutdown progress
-- if you like
-> IO ()
await workGate progress = go =<< atomically (readTVar (inProgress workGate))
where
go ip =
if M.null ip
then pass
else do
progress ip
go =<< atomically (do
ip' <- readTVar (inProgress workGate)
when (M.keys ip == M.keys ip')
retry
pure ip'
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment