Skip to content

Instantly share code, notes, and snippets.

@LSLeary
Last active May 18, 2023 10:19
Show Gist options
  • Save LSLeary/da963da122946d981b4b143cbcf3dd73 to your computer and use it in GitHub Desktop.
Save LSLeary/da963da122946d981b4b143cbcf3dd73 to your computer and use it in GitHub Desktop.
A thread-safe IC implementation supporting both eager parallel propagation and lazy demand driven evaluation.
{-# LANGUAGE GeneralisedNewtypeDeriving, DerivingVia #-}
{-# LANGUAGE BlockArguments, LambdaCase, NamedFieldPuns #-}
module Control.Concurrent.IC
( Adaptive, adaptively
, static, dynamic
, ICVar, newICVar, newICVarIO
, demand, propagate
, writeICVar , modifyICVar
, writeICVar' , modifyICVar'
, writeICVar'', modifyICVar''
) where
-- base
import Data.Unique (Unique, newUnique)
import Data.Monoid (Ap(..))
import Data.Function (on)
import Data.Functor (void)
import Data.Foldable (for_, traverse_)
import Control.Applicative (Alternative(..))
import Control.Monad (when, unless)
import Control.Monad.Fix (MonadFix(..))
import Control.Concurrent (forkIO)
-- containers
import Data.Set (Set, (\\))
import qualified Data.Set as S
-- transformers
import Control.Monad.Trans.Class (MonadTrans(..))
import Control.Monad.Trans.Reader (ReaderT(..), ask)
-- stm
import Control.Concurrent.STM (STM, atomically)
import Control.Concurrent.STM.TVar
(TVar, newTVarIO, readTVar, readTVarIO, writeTVar, modifyTVar)
import Control.Concurrent.STM.TMVar
( TMVar, newEmptyTMVarIO, isEmptyTMVar
, writeTMVar, takeTMVar, readTMVar, tryReadTMVar
)
-- | The monad in which 'ICVar' recipes are specified.
newtype Adaptive a = Adaptive (ReaderT AdaptEnv IO a)
deriving newtype (Functor, Applicative, Alternative, Monad, MonadFix)
deriving (Semigroup, Monoid) via Ap Adaptive a
newtype AdaptEnv = AdaptEnv{ observedDeps :: TVar (Set Interface) }
runAdaptive :: Adaptive a -> AdaptEnv -> IO a
runAdaptive (Adaptive ri) = runReaderT ri
inIO :: IO a -> Adaptive a
inIO = Adaptive . lift
-- | Sidestep the 'ICVar' to execute a recipe directly. Note that, consequently,
-- the result is not cached or shared.
adaptively :: Adaptive a -> IO a
adaptively ad = do
observedDeps <- newTVarIO S.empty
runAdaptive ad AdaptEnv{observedDeps}
-- | Depend statically upon an 'ICVar'.
static :: ICVar x -> Adaptive (x -> a) -> Adaptive a
static icv@ICVar{value,interface} adxa = do
inIO (request icv)
observe interface
adxa <*> inIO (atomically (readTMVar value))
-- | Depend dynamically upon an 'ICVar'.
dynamic :: ICVar a -> Adaptive a
dynamic icv@ICVar{interface} = do
observe interface
inIO (demand icv)
request :: ICVar x -> IO ()
request icv@ICVar{value} = do
dirty <- atomically (isEmptyTMVar value)
when dirty . void $ forkIO (evaluate icv)
observe :: Interface -> Adaptive ()
observe i = do
AdaptEnv{observedDeps} <- Adaptive ask
inIO . atomically $ modifyTVar observedDeps (S.insert i)
-- | An incrementally computed mutable variable @a@.
data ICVar a = ICVar
{ value :: !(TMVar a)
, compute :: !(TVar (Adaptive a))
, interface :: !Interface
, depends :: !(TVar (Set Interface))
}
instance Eq (ICVar a) where (==) = (==) `on` interface
instance Ord (ICVar a) where compare = compare `on` interface
-- | Given a recipe for computing an @a@ adaptively, produce an @'ICVar' a@
-- in a recipe context.
newICVar :: Adaptive a -> Adaptive (ICVar a)
newICVar = inIO . newICVarIO
-- | Given a recipe for computing an @a@ adaptively, produce an @'ICVar' a@.
newICVarIO :: Adaptive a -> IO (ICVar a)
newICVarIO comp = mfix \icv -> do
value <- newEmptyTMVarIO
compute <- newTVarIO comp
interface <- makeInterface icv
depends <- newTVarIO S.empty
pure ICVar{value,compute,interface,depends}
-- | Demand the value of an 'ICVar', triggering its computation and that of its
-- dependencies if necessary.
demand :: ICVar a -> IO a
demand icv@ICVar{value} = atomically (tryReadTMVar value) >>= \case
Just x -> pure x
Nothing -> do
evaluate icv
atomically (readTMVar value)
evaluate :: ICVar a -> IO ()
evaluate ICVar{value,compute,interface,depends} = do
comp <- readTVarIO compute
observedDeps <- newTVarIO S.empty
result <- runAdaptive comp AdaptEnv{observedDeps}
atomically do
writeTMVar value result
olddeps <- readTVar depends
newdeps <- readTVar observedDeps
writeTVar depends newdeps
let deletions = olddeps \\ newdeps
insertions = newdeps \\ olddeps
for_ deletions \Interface{revdeps} ->
modifyTVar revdeps (S.delete interface)
for_ insertions \Interface{revdeps} ->
modifyTVar revdeps (S.insert interface)
-- | Compute an 'ICVar's transitive reverse dependencies in the background.
propagate :: ICVar a -> IO ()
propagate ICVar{ interface = Interface{propagate_} } = propagate_
writeICVar, writeICVar', writeICVar'' :: ICVar a -> Adaptive a -> IO ()
-- | Replace an 'ICVar's recipe lazily.
writeICVar icv = modifyICVar icv . const
-- | Replace an 'ICVar's recipe and recompute it in the background.
writeICVar' icv = modifyICVar' icv . const
-- | Replace an 'ICVar's recipe and 'propagate' the change.
writeICVar'' icv = modifyICVar'' icv . const
modifyICVar, modifyICVar', modifyICVar''
:: ICVar a -> (Adaptive a -> Adaptive a) -> IO ()
-- | Modify an 'ICVar's recipe lazily.
modifyICVar ICVar{compute,interface} f = atomically do
modifyTVar compute f
dirty interface
-- | Modify an 'ICVar's recipe and recompute it in the background.
modifyICVar' icv f = do
modifyICVar icv f
void $ forkIO (evaluate icv)
-- | Modify an 'ICVar's recipe and 'propagate' the change.
modifyICVar'' icv f = do
modifyICVar icv f
propagate icv
data Interface = Interface
{ identifier :: !Unique
, revdeps :: !(TVar (Set Interface))
, dirty :: STM ()
, propagate_ :: IO ()
}
instance Eq Interface where (==) = (==) `on` identifier
instance Ord Interface where compare = compare `on` identifier
makeInterface :: ICVar a -> IO Interface
makeInterface ~icv@ICVar{value,interface} = do
identifier <- newUnique
revdeps <- newTVarIO S.empty
pure Interface
{ identifier, revdeps
, dirty = makeDirty interface
, propagate_ = makePropagate interface
}
where
makeDirty Interface{revdeps} = do
weDirty <- isEmptyTMVar value
unless weDirty do
void (takeTMVar value)
readTVar revdeps >>= traverse_ dirty
makePropagate Interface{revdeps} = do
request icv
readTVarIO revdeps >>= traverse_ propagate_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment