Skip to content

Instantly share code, notes, and snippets.

@turion
Created June 5, 2020 12:28
Show Gist options
  • Save turion/c775d388771fcf95f86dc52cefed8898 to your computer and use it in GitHub Desktop.
Save turion/c775d388771fcf95f86dc52cefed8898 to your computer and use it in GitHub Desktop.
{-# LANGUAGE Arrows #-}
{-# LANGUAGE DefaultSignatures #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE TupleSections #-}
-- base
import Control.Arrow
import Control.Category
import Control.Concurrent
import Control.Monad (guard, (>=>), void)
import Data.Data
import Prelude hiding ((.), id)
-- | Fun fact: Every 'ArrowChoice' allows for parallelism.
mapArr :: ArrowChoice arr => arr a b -> arr [a] [b]
mapArr myArr = proc input -> do
case input of
[] -> returnA -< []
a:as -> do
b <- myArr -< a
bs <- mapArr myArr -< as
returnA -< b : bs
-- * Parallel cells
-- | An arrow supporting effects in 'm', and internal state.
-- Basically a free arrow construction
data PCell m a b where
Id :: PCell m a a
ArrM :: (a -> m b) -> PCell m a b
Par :: PCell m a b -> PCell m c d -> PCell m (a, c) (b, d)
-- | I don't have @Second@ because it's not used in desugaring
First :: PCell m a b -> PCell m (a, c) (b, c)
Seq :: PCell m a b -> PCell m b c -> PCell m a c
Choice :: PCell m a b -> PCell m c d -> PCell m (Either a c) (Either b d)
-- State :: Data s => s -> PCell m (Maybe s) s
-- State :: Data s => s -> (s -> a -> m (b, s)) -> PCell m a b
-- | The 'Data' constraint is important for livecoding
Feedback :: Data s => s -> PCell m (a, s) (b, s) -> PCell m a b
-- Annoying, but seems necessary
AssocL :: PCell m (a, (b, c)) ((a, b), c)
AssocR :: PCell m ((a, b), c) (a, (b, c))
inspect :: PCell m a b -> String
inspect Id = "Id"
inspect (ArrM _) = "ArrM"
inspect (Par cell1 cell2) = "(Par " ++ inspect cell1 ++ " " ++ inspect cell2 ++ ")"
inspect (First cell) = "(First " ++ inspect cell ++ ")"
inspect (Seq cell1 cell2) = "(Seq " ++ inspect cell1 ++ " " ++ inspect cell2 ++ ")"
inspect (Choice cell1 cell2) = "(Choice " ++ inspect cell1 ++ " " ++ inspect cell2 ++ ")"
inspect (Feedback _ cell) = "(Feedback " ++ inspect cell ++ ")"
inspect AssocL = "AssocL"
inspect AssocR = "AssocR"
instance Monad m => Category (PCell m) where
id = Id
(.) = flip Seq
instance Monad m => Arrow (PCell m) where
arr = ArrM . (return .)
(***) = Par
first = First
instance Monad m => ArrowChoice (PCell m) where
(+++) = Choice
-- -- | To build up state: An initialized feedback loop.
-- feedback
-- :: Data s
-- => s
-- -> PCell m (a, s) (b, s)
-- -> PCell m a b
-- feedback s0 cell = proc a -> do
-- sLast <- State s0 -< s
-- * Optimization
-- | This is the workhorse.
-- Rearrange the cell in such a way as to optimally use parallelism,
-- and other efficiency tweaks.
-- This means to increase the length of sequential compositions,
-- and make the constituents of parallel compositions so independent as possible.
--
-- The basic idea is similar to @ApplicativeDo@:
-- Every time we use several arguments at the same time,
-- this creates a fence in the parallel execution.
-- A lot of these fences are unnecessary,
-- for example when two separate functions create the element of a tuple,
-- and then two other separate functions use each element of the tuple independently.
--
-- Caution: This function does not preserve the guarantee that in 'cell1 *** cell2',
-- the effects of 'cell1' are executed before 'cell2'!
optimize
:: Monad m
=> PCell m a b
-> PCell m a b
-- This is the main optimization:
-- Instead of first executing two cells in parallel,
-- collecting their results in a tuple and then dividing the tuple again,
-- execute both lines in parallel.
optimize (Seq (Par cell1L cell1R) (Par cell2L cell2R))
= optimize $ Par (Seq cell1L cell2L) (Seq cell1R cell2R)
-- Efficiency tweaks: Fuse atomic operations together
optimize (Seq Id cell) = optimize cell
optimize (Seq cell Id) = optimize cell
optimize (Seq (ArrM f) (ArrM g)) = ArrM $ f >=> g
-- optimize (ArrM f) (State s0 g) = State s $ _
-- optimize (State s0 f) (ArrM g) = State s $ _
optimize (Par (ArrM f) Id) = ArrM $ \(a, c) -> (, c) <$> f a
optimize (Par Id (ArrM f)) = ArrM $ \(a, c) -> (a, ) <$> f c
-- Normalization: Group combinators on the left side and try to optimize
optimize (Seq cell1 (Seq cell2 cell3))
-- Can we maybe optimize the second part individually?
= case optimize (Seq cell2 cell3) of
-- No huge change. Group to the left and try to optimize.
Seq cell2' cell3' -> optimize $ Seq (Seq cell1 cell2') cell3'
-- Something moved considerably.
cell23 -> optimize $ Seq cell1 cell23
optimize (Par cell1 (Par cell2 cell3)) = optimize $ Seq (Seq AssocL (Par (Par cell1 cell2) cell3)) AssocR
-- Try to get rid of the associators again
optimize (Seq AssocR AssocL) = Id
optimize (Seq AssocL AssocR) = Id
-- Fuse and normalize feedback loops: Move feedback/state as far out as possible.
optimize (Feedback s1 (Feedback s2 cell)) = optimize $ Feedback (s1, s2) $ Seq (Seq AssocL cell) AssocR
optimize (Seq cell1 (Feedback s cell2)) = optimize $ Feedback s $ Seq (Par cell1 Id) cell2
optimize (Seq (Feedback s cell1) cell2) = optimize $ Feedback s $ Seq cell1 (Par cell2 Id)
optimize (Par cell1 (Feedback s cell2)) = optimize $ Feedback s $ Seq (Seq AssocR $ Par cell1 cell2) AssocL
-- TODO To get this one to run I need to introduce Swap as a primitive as well
-- optimize (Par (Feedback s cell1) cell2) = optimize $ Feedback s $ Seq (Seq AssocR _) AssocL
-- Recursion: We didn't find immediate optimizations,
-- let's dig deeper and see what we can find there.
--
-- TODO: I'd really like to apply optimize again to the result,
-- but that's a circle.
-- I could return, from each individual optimization,
-- whether some optimization took place,
-- and if yes, optimize the whole thing again.
optimize (Seq cell1 cell2) = Seq (optimize cell1) (optimize cell2)
optimize (Par cell1 cell2) = Par (optimize cell1) (optimize cell2)
optimize (Feedback s cell) = Feedback s $ optimize cell
optimize (First cell) = optimize $ Par cell Id
-- TODO Optimizations for ArrowChoice
-- Catch-all: We didn't find any further optimization opportunities.
optimize cell = cell
-- * Parallel monads
-- | 'Applicative's that support parallel execution.
-- If you use 'runPar', you don't get a guarantee on which part is executed first.
class Applicative m => ApplicativePar m where
-- | Runs both computations at the same time and returns the result of both.
-- Like async's @concurrently@.
runPar :: m a -> m b -> m (a, b)
-- Every 'Applicative' can do this, but not necessarily in a fast way.
default runPar :: m a -> m b -> m (a, b)
runPar ma mb = ( , ) <$> ma <*> mb
-- | Launch separate threads for each computation and collect the results in 'MVar's.
instance ApplicativePar IO where
runPar a b = do
varA <- newEmptyMVar
varB <- newEmptyMVar
threadA <- forkIO $ a >>= putMVar varA
threadB <- forkIO $ b >>= putMVar varB
resultA <- takeMVar varA
resultB <- takeMVar varB
killThread threadA
killThread threadB
return (resultA, resultB)
-- * Parallel execution
-- | Feed a single input event to a cell,
-- using parallelism to execute it.
stepPCell
:: (Monad m, ApplicativePar m)
=> PCell m a b
-> a
-> m (b, PCell m a b)
-- This is the whole point.
-- Execute parallel cells using the parallelism capabilities of the monad.
stepPCell (Par cell1 cell2) (a, b) = do
((c, cell1'), (d, cell2')) <- runPar (stepPCell cell1 a) (stepPCell cell2 b)
return ((c, d), Par cell1' cell2')
stepPCell (First cell) (a, c) = do
(b, cell') <- stepPCell cell a
return ((b, c), First cell')
stepPCell Id a = return (a, Id)
stepPCell cell@(ArrM f) a = ( , cell) <$> f a
-- stepPCell (State s f) a = do
-- (b, s') <- f a
-- return (b, State s' f)
stepPCell (Feedback s cell) a = do
((a', s'), cell') <- stepPCell cell (a, s)
return (a', Feedback s' cell')
stepPCell (Seq cell1 cell2) a = do
(b, cell1') <- stepPCell cell1 a
(c, cell2') <- stepPCell cell2 b
return (c, Seq cell1' cell2')
stepPCell (Choice cell1 cell2) (Left a) = do
(b, cell1') <- stepPCell cell1 a
return (Left b, Choice cell1' cell2)
stepPCell (Choice cell1 cell2) (Right c) = do
(d, cell2') <- stepPCell cell2 c
return (Right d, Choice cell1 cell2')
stepPCell AssocL (a, (b, c)) = return (((a, b), c), AssocL)
stepPCell AssocR ((a, b), c) = return ((a, (b, c)), AssocR)
-- | Feed a stream of inputs into a 'PCell'.
runPCell
:: (Monad m, ApplicativePar m)
=> PCell m a b
-> [a]
-> m [b]
runPCell _ [] = return []
runPCell cell (a : as) = do
(b, cell') <- stepPCell cell a
bs <- runPCell cell' as
return $ b : bs
runPCell_
:: (Monad m, ApplicativePar m)
=> PCell m a b
-> [a]
-> m ()
runPCell_ cell a = void $ runPCell cell a
-- * Caching
type Hash = Int
-- | I'm sure there's proper libraries for this kind of thing out there.
class Hashable a where
hash :: a -> Hash
-- | I'm sure one can write a default instance, I'm just too lazy right now.
-- It's also not super important to have one.
default hash :: Data a => a -> Hash
hash = undefined
-- Ok, I'm cheating here.
instance Hashable Int where
hash = id
-- | If the inputs haven't changed, don't recompute.
cached
:: (Monad m, Data b, Hashable a)
=> PCell m a b
-> PCell m a b
cached cell = Feedback Nothing $ proc (a, cache) -> do
let aHash = hash a
case validateCache cache aHash of
Nothing -> do
b <- cell -< a
let newCache = Just (b, aHash)
returnA -< (b, newCache)
Just b -> do
returnA -< (b, cache)
-- | Look up whether the cache is still valid,
-- and if yes, return its content
validateCache :: Maybe (b, Hash) -> Hash -> Maybe b
validateCache maybeCache newHash = do
(content, oldHash) <- maybeCache
guard $ oldHash == newHash
return content
-- * Sundry
-- TODO Quickcheck that 'optimize' preserves semantics and is idempotent
-- instance (Coarbitrary a, Arbitrary b) => Arbitrary (PCell m) where
-- * Try it out!
main :: IO ()
main = do
putStrLn "Pure function *2:"
print =<< runPCell (arr (*2)) [1,2,3::Int]
putStrLn "Side effects:"
runPCell_ (ArrM print <<< arr (*2)) [1,1,2,2,3::Int]
putStrLn "Caching. Will not print duplicates:"
runPCell_ (cached $ ArrM print <<< arr (*2)) [1,1,2,2,3::Int]
putStrLn "Unoptimized:"
runPCell_ mainCell [("Ugh,", "this", "takes", "forever!")]
putStrLn "Optimized:"
runPCell_ (optimize $ optimize $ optimize mainCell) [("Oh,", "going", "pretty", "fast!")]
-- | Block for some time and then return the value
longComputation :: Show a => PCell IO a a
longComputation = optimize $ optimize $ optimize $ proc a -> do
_ <- ArrM threadDelay -< 1000000
_ <- ArrM print -< a
returnA -< a
-- FIXME mapArr recurses and thus builds up an infinite cell.
-- mainCell :: PCell IO [String] [()]
-- mainCell = mapArr $ longComputation 1000000 >>> ArrM putStrLn
mainCell :: PCell IO (String, String, String, String) ()
mainCell = proc (s1, s2, s3, s4) -> do
longComputation -< s1
longComputation -< s2
longComputation -< s3
longComputation -< s4
returnA -< ()
smallCell :: PCell IO (String, String) (String, String)
smallCell = proc (s1, s2) -> do
s1' <- longComputation -< s1
s2' <- longComputation -< s2
returnA -< (s1', s2')
smallCell' :: PCell IO (String, String) (String, String)
smallCell' = longComputation *** longComputation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment