Skip to content

Instantly share code, notes, and snippets.

@twanvl
Created June 15, 2012 20:02
Show Gist options
  • Save twanvl/2938456 to your computer and use it in GitHub Desktop.
Save twanvl/2938456 to your computer and use it in GitHub Desktop.
Applicative Concurrent computation without unnecessary threads
{-# LANGUAGE ImplicitParams #-}
import Control.Monad
import Control.Monad.Fix
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Exception
import Prelude hiding (catch)
import Data.IORef
import Data.Traversable
--import GHC.IO hiding (onException)
import System.IO
import Text.Printf
-- for debugging
tr x = do
hSetBuffering stdout LineBuffering
tid <- myThreadId
printf "thread %s says: %s\n" (show tid) x
--------------------------------------------------------------------------------
-- The Concurrently applicative functor
--------------------------------------------------------------------------------
type Stopper = IO ()
type Storer a = a -> IO ()
newtype Concurrently a = Concurrently
{ getConcurrently :: Storer (IO (Maybe a)) -> IO Stopper }
async :: IO a -> Concurrently a
async io = Concurrently $ \store -> do
tid <- forkIOWithUnmask $ \unmask -> do
store =<< ((liftM (return . Just) io) `catchAll` (return . throwIO))
return $ killThread tid
catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll = catch
instance Functor Concurrently where
fmap f x = Concurrently $ \store -> getConcurrently x (store . fmap (fmap f))
instance Applicative Concurrently where
pure x = async (return x) -- can't do this without async :(
x <*> y = Concurrently $ \store -> do
box <- newIORef Nothing
stopX <- getConcurrently x (store . (>>= putBox box . fmap Left))
stopY <- getConcurrently y (store . (>>= putBox box . fmap Right))
return (stopX >> stopY)
where
putBox _ Nothing = return Nothing
putBox box (Just a) = do
b <- readIORef box
case b of
Nothing -> writeIORef box (Just a) >> return Nothing
Just b' -> return (combine a b')
combine (Left a) (Right b) = Just (a b)
combine (Right b) (Left a) = Just (a b)
combine _ _ = Nothing
instance Alternative Concurrently where
empty = Concurrently $ \_ -> return (return ())
x <|> y = Concurrently $ \store -> mfix $ \stop -> do
stopX <- getConcurrently x (store . (stop >>))
stopY <- getConcurrently y (store . (stop >>))
return (stopX >> stopY)
-- | Run a @Concurrently a@ with one thread per @async@
-- Note: implementation is inspired by Control.Concurrently.Async.Concurrently'
runConcurrently :: Concurrently a -> IO a
runConcurrently x = do
postbox <- newEmptyMVar
mask $ \restore -> do
stop <- getConcurrently x (putMVar postbox)
let go = do
ans <- join $ takeMVar postbox
case ans of
Just a -> return a
Nothing -> go
a <- restore go `onException` stop
stop
return a
--------------------------------------------------------------------------------
-- Derived functions
--------------------------------------------------------------------------------
concurrently :: IO a -> IO b -> IO (a,b)
concurrently x y = runConcurrently $ (,) <$> async x <*> async y
race :: IO a -> IO a -> IO a
race x y = runConcurrently $ async x <|> async y
doConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
doConcurrently f = runConcurrently . traverse (async . f)
--------------------------------------------------------------------------------
-- Example
--------------------------------------------------------------------------------
getURL :: String -> IO String
getURL s = do
tr $ "getUrl " ++ s
return $ "<" ++ s ++ ">"
main = do
(page1, page2, page3)
<- runConcurrently $ (,,)
<$> async (getURL "url1")
<*> async (getURL "url2")
<*> async (getURL "url3")
print [page1, page2, page3]
(page1, page2, page3)
<- runConcurrently $ (,,)
<$> (async (getURL "url1") <|> empty)
<*> (empty <|> async (threadDelay 1 >> getURL "url2") <|> async (threadDelay 0 >> getURL "url2b"))
<*> async (getURL "url3")
print [page1, page2, page3]
pages <- doConcurrently getURL ["url1", "url2", "url3"]
print pages
page <- runConcurrently
$ async (getURL "url1")
<|> empty
<|> async (getURL "url2")
print page
(page1, page2, page3)
<- runConcurrently $ (,,)
<$> async (threadDelay 1000 >> error "fail")
<*> async (threadDelay 2000 >> getURL "url2")
<*> async (getURL "url3")
print [page1, page2, page3]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment