Created
June 5, 2020 12:28
-
-
Save turion/c775d388771fcf95f86dc52cefed8898 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE Arrows #-} | |
{-# LANGUAGE DefaultSignatures #-} | |
{-# LANGUAGE GADTs #-} | |
{-# LANGUAGE TupleSections #-} | |
-- base | |
import Control.Arrow | |
import Control.Category | |
import Control.Concurrent | |
import Control.Monad (guard, (>=>), void) | |
import Data.Data | |
import Prelude hiding ((.), id) | |
-- | Fun fact: Every 'ArrowChoice' allows for parallelism. | |
mapArr :: ArrowChoice arr => arr a b -> arr [a] [b] | |
mapArr myArr = proc input -> do | |
case input of | |
[] -> returnA -< [] | |
a:as -> do | |
b <- myArr -< a | |
bs <- mapArr myArr -< as | |
returnA -< b : bs | |
-- * Parallel cells | |
-- | An arrow supporting effects in 'm', and internal state. | |
-- Basically a free arrow construction | |
data PCell m a b where | |
Id :: PCell m a a | |
ArrM :: (a -> m b) -> PCell m a b | |
Par :: PCell m a b -> PCell m c d -> PCell m (a, c) (b, d) | |
-- | I don't have @Second@ because it's not used in desugaring | |
First :: PCell m a b -> PCell m (a, c) (b, c) | |
Seq :: PCell m a b -> PCell m b c -> PCell m a c | |
Choice :: PCell m a b -> PCell m c d -> PCell m (Either a c) (Either b d) | |
-- State :: Data s => s -> PCell m (Maybe s) s | |
-- State :: Data s => s -> (s -> a -> m (b, s)) -> PCell m a b | |
-- | The 'Data' constraint is important for livecoding | |
Feedback :: Data s => s -> PCell m (a, s) (b, s) -> PCell m a b | |
-- Annoying, but seems necessary | |
AssocL :: PCell m (a, (b, c)) ((a, b), c) | |
AssocR :: PCell m ((a, b), c) (a, (b, c)) | |
inspect :: PCell m a b -> String | |
inspect Id = "Id" | |
inspect (ArrM _) = "ArrM" | |
inspect (Par cell1 cell2) = "(Par " ++ inspect cell1 ++ " " ++ inspect cell2 ++ ")" | |
inspect (First cell) = "(First " ++ inspect cell ++ ")" | |
inspect (Seq cell1 cell2) = "(Seq " ++ inspect cell1 ++ " " ++ inspect cell2 ++ ")" | |
inspect (Choice cell1 cell2) = "(Choice " ++ inspect cell1 ++ " " ++ inspect cell2 ++ ")" | |
inspect (Feedback _ cell) = "(Feedback " ++ inspect cell ++ ")" | |
inspect AssocL = "AssocL" | |
inspect AssocR = "AssocR" | |
instance Monad m => Category (PCell m) where | |
id = Id | |
(.) = flip Seq | |
instance Monad m => Arrow (PCell m) where | |
arr = ArrM . (return .) | |
(***) = Par | |
first = First | |
instance Monad m => ArrowChoice (PCell m) where | |
(+++) = Choice | |
-- -- | To build up state: An initialized feedback loop. | |
-- feedback | |
-- :: Data s | |
-- => s | |
-- -> PCell m (a, s) (b, s) | |
-- -> PCell m a b | |
-- feedback s0 cell = proc a -> do | |
-- sLast <- State s0 -< s | |
-- * Optimization | |
-- | This is the workhorse. | |
-- Rearrange the cell in such a way as to optimally use parallelism, | |
-- and other efficiency tweaks. | |
-- This means to increase the length of sequential compositions, | |
-- and make the constituents of parallel compositions so independent as possible. | |
-- | |
-- The basic idea is similar to @ApplicativeDo@: | |
-- Every time we use several arguments at the same time, | |
-- this creates a fence in the parallel execution. | |
-- A lot of these fences are unnecessary, | |
-- for example when two separate functions create the element of a tuple, | |
-- and then two other separate functions use each element of the tuple independently. | |
-- | |
-- Caution: This function does not preserve the guarantee that in 'cell1 *** cell2', | |
-- the effects of 'cell1' are executed before 'cell2'! | |
optimize | |
:: Monad m | |
=> PCell m a b | |
-> PCell m a b | |
-- This is the main optimization: | |
-- Instead of first executing two cells in parallel, | |
-- collecting their results in a tuple and then dividing the tuple again, | |
-- execute both lines in parallel. | |
optimize (Seq (Par cell1L cell1R) (Par cell2L cell2R)) | |
= optimize $ Par (Seq cell1L cell2L) (Seq cell1R cell2R) | |
-- Efficiency tweaks: Fuse atomic operations together | |
optimize (Seq Id cell) = optimize cell | |
optimize (Seq cell Id) = optimize cell | |
optimize (Seq (ArrM f) (ArrM g)) = ArrM $ f >=> g | |
-- optimize (ArrM f) (State s0 g) = State s $ _ | |
-- optimize (State s0 f) (ArrM g) = State s $ _ | |
optimize (Par (ArrM f) Id) = ArrM $ \(a, c) -> (, c) <$> f a | |
optimize (Par Id (ArrM f)) = ArrM $ \(a, c) -> (a, ) <$> f c | |
-- Normalization: Group combinators on the left side and try to optimize | |
optimize (Seq cell1 (Seq cell2 cell3)) | |
-- Can we maybe optimize the second part individually? | |
= case optimize (Seq cell2 cell3) of | |
-- No huge change. Group to the left and try to optimize. | |
Seq cell2' cell3' -> optimize $ Seq (Seq cell1 cell2') cell3' | |
-- Something moved considerably. | |
cell23 -> optimize $ Seq cell1 cell23 | |
optimize (Par cell1 (Par cell2 cell3)) = optimize $ Seq (Seq AssocL (Par (Par cell1 cell2) cell3)) AssocR | |
-- Try to get rid of the associators again | |
optimize (Seq AssocR AssocL) = Id | |
optimize (Seq AssocL AssocR) = Id | |
-- Fuse and normalize feedback loops: Move feedback/state as far out as possible. | |
optimize (Feedback s1 (Feedback s2 cell)) = optimize $ Feedback (s1, s2) $ Seq (Seq AssocL cell) AssocR | |
optimize (Seq cell1 (Feedback s cell2)) = optimize $ Feedback s $ Seq (Par cell1 Id) cell2 | |
optimize (Seq (Feedback s cell1) cell2) = optimize $ Feedback s $ Seq cell1 (Par cell2 Id) | |
optimize (Par cell1 (Feedback s cell2)) = optimize $ Feedback s $ Seq (Seq AssocR $ Par cell1 cell2) AssocL | |
-- TODO To get this one to run I need to introduce Swap as a primitive as well | |
-- optimize (Par (Feedback s cell1) cell2) = optimize $ Feedback s $ Seq (Seq AssocR _) AssocL | |
-- Recursion: We didn't find immediate optimizations, | |
-- let's dig deeper and see what we can find there. | |
-- | |
-- TODO: I'd really like to apply optimize again to the result, | |
-- but that's a circle. | |
-- I could return, from each individual optimization, | |
-- whether some optimization took place, | |
-- and if yes, optimize the whole thing again. | |
optimize (Seq cell1 cell2) = Seq (optimize cell1) (optimize cell2) | |
optimize (Par cell1 cell2) = Par (optimize cell1) (optimize cell2) | |
optimize (Feedback s cell) = Feedback s $ optimize cell | |
optimize (First cell) = optimize $ Par cell Id | |
-- TODO Optimizations for ArrowChoice | |
-- Catch-all: We didn't find any further optimization opportunities. | |
optimize cell = cell | |
-- * Parallel monads | |
-- | 'Applicative's that support parallel execution. | |
-- If you use 'runPar', you don't get a guarantee on which part is executed first. | |
class Applicative m => ApplicativePar m where | |
-- | Runs both computations at the same time and returns the result of both. | |
-- Like async's @concurrently@. | |
runPar :: m a -> m b -> m (a, b) | |
-- Every 'Applicative' can do this, but not necessarily in a fast way. | |
default runPar :: m a -> m b -> m (a, b) | |
runPar ma mb = ( , ) <$> ma <*> mb | |
-- | Launch separate threads for each computation and collect the results in 'MVar's. | |
instance ApplicativePar IO where | |
runPar a b = do | |
varA <- newEmptyMVar | |
varB <- newEmptyMVar | |
threadA <- forkIO $ a >>= putMVar varA | |
threadB <- forkIO $ b >>= putMVar varB | |
resultA <- takeMVar varA | |
resultB <- takeMVar varB | |
killThread threadA | |
killThread threadB | |
return (resultA, resultB) | |
-- * Parallel execution | |
-- | Feed a single input event to a cell, | |
-- using parallelism to execute it. | |
stepPCell | |
:: (Monad m, ApplicativePar m) | |
=> PCell m a b | |
-> a | |
-> m (b, PCell m a b) | |
-- This is the whole point. | |
-- Execute parallel cells using the parallelism capabilities of the monad. | |
stepPCell (Par cell1 cell2) (a, b) = do | |
((c, cell1'), (d, cell2')) <- runPar (stepPCell cell1 a) (stepPCell cell2 b) | |
return ((c, d), Par cell1' cell2') | |
stepPCell (First cell) (a, c) = do | |
(b, cell') <- stepPCell cell a | |
return ((b, c), First cell') | |
stepPCell Id a = return (a, Id) | |
stepPCell cell@(ArrM f) a = ( , cell) <$> f a | |
-- stepPCell (State s f) a = do | |
-- (b, s') <- f a | |
-- return (b, State s' f) | |
stepPCell (Feedback s cell) a = do | |
((a', s'), cell') <- stepPCell cell (a, s) | |
return (a', Feedback s' cell') | |
stepPCell (Seq cell1 cell2) a = do | |
(b, cell1') <- stepPCell cell1 a | |
(c, cell2') <- stepPCell cell2 b | |
return (c, Seq cell1' cell2') | |
stepPCell (Choice cell1 cell2) (Left a) = do | |
(b, cell1') <- stepPCell cell1 a | |
return (Left b, Choice cell1' cell2) | |
stepPCell (Choice cell1 cell2) (Right c) = do | |
(d, cell2') <- stepPCell cell2 c | |
return (Right d, Choice cell1 cell2') | |
stepPCell AssocL (a, (b, c)) = return (((a, b), c), AssocL) | |
stepPCell AssocR ((a, b), c) = return ((a, (b, c)), AssocR) | |
-- | Feed a stream of inputs into a 'PCell'. | |
runPCell | |
:: (Monad m, ApplicativePar m) | |
=> PCell m a b | |
-> [a] | |
-> m [b] | |
runPCell _ [] = return [] | |
runPCell cell (a : as) = do | |
(b, cell') <- stepPCell cell a | |
bs <- runPCell cell' as | |
return $ b : bs | |
runPCell_ | |
:: (Monad m, ApplicativePar m) | |
=> PCell m a b | |
-> [a] | |
-> m () | |
runPCell_ cell a = void $ runPCell cell a | |
-- * Caching | |
type Hash = Int | |
-- | I'm sure there's proper libraries for this kind of thing out there. | |
class Hashable a where | |
hash :: a -> Hash | |
-- | I'm sure one can write a default instance, I'm just too lazy right now. | |
-- It's also not super important to have one. | |
default hash :: Data a => a -> Hash | |
hash = undefined | |
-- Ok, I'm cheating here. | |
instance Hashable Int where | |
hash = id | |
-- | If the inputs haven't changed, don't recompute. | |
cached | |
:: (Monad m, Data b, Hashable a) | |
=> PCell m a b | |
-> PCell m a b | |
cached cell = Feedback Nothing $ proc (a, cache) -> do | |
let aHash = hash a | |
case validateCache cache aHash of | |
Nothing -> do | |
b <- cell -< a | |
let newCache = Just (b, aHash) | |
returnA -< (b, newCache) | |
Just b -> do | |
returnA -< (b, cache) | |
-- | Look up whether the cache is still valid, | |
-- and if yes, return its content | |
validateCache :: Maybe (b, Hash) -> Hash -> Maybe b | |
validateCache maybeCache newHash = do | |
(content, oldHash) <- maybeCache | |
guard $ oldHash == newHash | |
return content | |
-- * Sundry | |
-- TODO Quickcheck that 'optimize' preserves semantics and is idempotent | |
-- instance (Coarbitrary a, Arbitrary b) => Arbitrary (PCell m) where | |
-- * Try it out! | |
main :: IO () | |
main = do | |
putStrLn "Pure function *2:" | |
print =<< runPCell (arr (*2)) [1,2,3::Int] | |
putStrLn "Side effects:" | |
runPCell_ (ArrM print <<< arr (*2)) [1,1,2,2,3::Int] | |
putStrLn "Caching. Will not print duplicates:" | |
runPCell_ (cached $ ArrM print <<< arr (*2)) [1,1,2,2,3::Int] | |
putStrLn "Unoptimized:" | |
runPCell_ mainCell [("Ugh,", "this", "takes", "forever!")] | |
putStrLn "Optimized:" | |
runPCell_ (optimize $ optimize $ optimize mainCell) [("Oh,", "going", "pretty", "fast!")] | |
-- | Block for some time and then return the value | |
longComputation :: Show a => PCell IO a a | |
longComputation = optimize $ optimize $ optimize $ proc a -> do | |
_ <- ArrM threadDelay -< 1000000 | |
_ <- ArrM print -< a | |
returnA -< a | |
-- FIXME mapArr recurses and thus builds up an infinite cell. | |
-- mainCell :: PCell IO [String] [()] | |
-- mainCell = mapArr $ longComputation 1000000 >>> ArrM putStrLn | |
mainCell :: PCell IO (String, String, String, String) () | |
mainCell = proc (s1, s2, s3, s4) -> do | |
longComputation -< s1 | |
longComputation -< s2 | |
longComputation -< s3 | |
longComputation -< s4 | |
returnA -< () | |
smallCell :: PCell IO (String, String) (String, String) | |
smallCell = proc (s1, s2) -> do | |
s1' <- longComputation -< s1 | |
s2' <- longComputation -< s2 | |
returnA -< (s1', s2') | |
smallCell' :: PCell IO (String, String) (String, String) | |
smallCell' = longComputation *** longComputation |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment