Last active
May 18, 2023 10:19
-
-
Save LSLeary/da963da122946d981b4b143cbcf3dd73 to your computer and use it in GitHub Desktop.
A thread-safe IC implementation supporting both eager parallel propagation and lazy demand driven evaluation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE GeneralisedNewtypeDeriving, DerivingVia #-} | |
{-# LANGUAGE BlockArguments, LambdaCase, NamedFieldPuns #-} | |
module Control.Concurrent.IC | |
( Adaptive, adaptively | |
, static, dynamic | |
, ICVar, newICVar, newICVarIO | |
, demand, propagate | |
, writeICVar , modifyICVar | |
, writeICVar' , modifyICVar' | |
, writeICVar'', modifyICVar'' | |
) where | |
-- base | |
import Data.Unique (Unique, newUnique) | |
import Data.Monoid (Ap(..)) | |
import Data.Function (on) | |
import Data.Functor (void) | |
import Data.Foldable (for_, traverse_) | |
import Control.Applicative (Alternative(..)) | |
import Control.Monad (when, unless) | |
import Control.Monad.Fix (MonadFix(..)) | |
import Control.Concurrent (forkIO) | |
-- containers | |
import Data.Set (Set, (\\)) | |
import qualified Data.Set as S | |
-- transformers | |
import Control.Monad.Trans.Class (MonadTrans(..)) | |
import Control.Monad.Trans.Reader (ReaderT(..), ask) | |
-- stm | |
import Control.Concurrent.STM (STM, atomically) | |
import Control.Concurrent.STM.TVar | |
(TVar, newTVarIO, readTVar, readTVarIO, writeTVar, modifyTVar) | |
import Control.Concurrent.STM.TMVar | |
( TMVar, newEmptyTMVarIO, isEmptyTMVar | |
, writeTMVar, takeTMVar, readTMVar, tryReadTMVar | |
) | |
-- | The monad in which 'ICVar' recipes are specified. | |
newtype Adaptive a = Adaptive (ReaderT AdaptEnv IO a) | |
deriving newtype (Functor, Applicative, Alternative, Monad, MonadFix) | |
deriving (Semigroup, Monoid) via Ap Adaptive a | |
newtype AdaptEnv = AdaptEnv{ observedDeps :: TVar (Set Interface) } | |
runAdaptive :: Adaptive a -> AdaptEnv -> IO a | |
runAdaptive (Adaptive ri) = runReaderT ri | |
inIO :: IO a -> Adaptive a | |
inIO = Adaptive . lift | |
-- | Sidestep the 'ICVar' to execute a recipe directly. Note that, consequently, | |
-- the result is not cached or shared. | |
adaptively :: Adaptive a -> IO a | |
adaptively ad = do | |
observedDeps <- newTVarIO S.empty | |
runAdaptive ad AdaptEnv{observedDeps} | |
-- | Depend statically upon an 'ICVar'. | |
static :: ICVar x -> Adaptive (x -> a) -> Adaptive a | |
static icv@ICVar{value,interface} adxa = do | |
inIO (request icv) | |
observe interface | |
adxa <*> inIO (atomically (readTMVar value)) | |
-- | Depend dynamically upon an 'ICVar'. | |
dynamic :: ICVar a -> Adaptive a | |
dynamic icv@ICVar{interface} = do | |
observe interface | |
inIO (demand icv) | |
request :: ICVar x -> IO () | |
request icv@ICVar{value} = do | |
dirty <- atomically (isEmptyTMVar value) | |
when dirty . void $ forkIO (evaluate icv) | |
observe :: Interface -> Adaptive () | |
observe i = do | |
AdaptEnv{observedDeps} <- Adaptive ask | |
inIO . atomically $ modifyTVar observedDeps (S.insert i) | |
-- | An incrementally computed mutable variable @a@. | |
data ICVar a = ICVar | |
{ value :: !(TMVar a) | |
, compute :: !(TVar (Adaptive a)) | |
, interface :: !Interface | |
, depends :: !(TVar (Set Interface)) | |
} | |
instance Eq (ICVar a) where (==) = (==) `on` interface | |
instance Ord (ICVar a) where compare = compare `on` interface | |
-- | Given a recipe for computing an @a@ adaptively, produce an @'ICVar' a@ | |
-- in a recipe context. | |
newICVar :: Adaptive a -> Adaptive (ICVar a) | |
newICVar = inIO . newICVarIO | |
-- | Given a recipe for computing an @a@ adaptively, produce an @'ICVar' a@. | |
newICVarIO :: Adaptive a -> IO (ICVar a) | |
newICVarIO comp = mfix \icv -> do | |
value <- newEmptyTMVarIO | |
compute <- newTVarIO comp | |
interface <- makeInterface icv | |
depends <- newTVarIO S.empty | |
pure ICVar{value,compute,interface,depends} | |
-- | Demand the value of an 'ICVar', triggering its computation and that of its | |
-- dependencies if necessary. | |
demand :: ICVar a -> IO a | |
demand icv@ICVar{value} = atomically (tryReadTMVar value) >>= \case | |
Just x -> pure x | |
Nothing -> do | |
evaluate icv | |
atomically (readTMVar value) | |
evaluate :: ICVar a -> IO () | |
evaluate ICVar{value,compute,interface,depends} = do | |
comp <- readTVarIO compute | |
observedDeps <- newTVarIO S.empty | |
result <- runAdaptive comp AdaptEnv{observedDeps} | |
atomically do | |
writeTMVar value result | |
olddeps <- readTVar depends | |
newdeps <- readTVar observedDeps | |
writeTVar depends newdeps | |
let deletions = olddeps \\ newdeps | |
insertions = newdeps \\ olddeps | |
for_ deletions \Interface{revdeps} -> | |
modifyTVar revdeps (S.delete interface) | |
for_ insertions \Interface{revdeps} -> | |
modifyTVar revdeps (S.insert interface) | |
-- | Compute an 'ICVar's transitive reverse dependencies in the background. | |
propagate :: ICVar a -> IO () | |
propagate ICVar{ interface = Interface{propagate_} } = propagate_ | |
writeICVar, writeICVar', writeICVar'' :: ICVar a -> Adaptive a -> IO () | |
-- | Replace an 'ICVar's recipe lazily. | |
writeICVar icv = modifyICVar icv . const | |
-- | Replace an 'ICVar's recipe and recompute it in the background. | |
writeICVar' icv = modifyICVar' icv . const | |
-- | Replace an 'ICVar's recipe and 'propagate' the change. | |
writeICVar'' icv = modifyICVar'' icv . const | |
modifyICVar, modifyICVar', modifyICVar'' | |
:: ICVar a -> (Adaptive a -> Adaptive a) -> IO () | |
-- | Modify an 'ICVar's recipe lazily. | |
modifyICVar ICVar{compute,interface} f = atomically do | |
modifyTVar compute f | |
dirty interface | |
-- | Modify an 'ICVar's recipe and recompute it in the background. | |
modifyICVar' icv f = do | |
modifyICVar icv f | |
void $ forkIO (evaluate icv) | |
-- | Modify an 'ICVar's recipe and 'propagate' the change. | |
modifyICVar'' icv f = do | |
modifyICVar icv f | |
propagate icv | |
data Interface = Interface | |
{ identifier :: !Unique | |
, revdeps :: !(TVar (Set Interface)) | |
, dirty :: STM () | |
, propagate_ :: IO () | |
} | |
instance Eq Interface where (==) = (==) `on` identifier | |
instance Ord Interface where compare = compare `on` identifier | |
makeInterface :: ICVar a -> IO Interface | |
makeInterface ~icv@ICVar{value,interface} = do | |
identifier <- newUnique | |
revdeps <- newTVarIO S.empty | |
pure Interface | |
{ identifier, revdeps | |
, dirty = makeDirty interface | |
, propagate_ = makePropagate interface | |
} | |
where | |
makeDirty Interface{revdeps} = do | |
weDirty <- isEmptyTMVar value | |
unless weDirty do | |
void (takeTMVar value) | |
readTVar revdeps >>= traverse_ dirty | |
makePropagate Interface{revdeps} = do | |
request icv | |
readTVarIO revdeps >>= traverse_ propagate_ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment