Created
March 26, 2012 02:22
-
-
Save thelff/2202469 to your computer and use it in GitHub Desktop.
sinkfork' and sinkforks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- CL.sourceList [1..10] $$ sinkFork' (CL.consume) (CL.isolate 0 =$ CL.consume) | |
-- ([1],[]) | |
{-# LANGUAGE ScopedTypeVariables #-} | |
import Data.Conduit | |
import qualified Data.Conduit.List as CL | |
import Control.Applicative ((<$>), (<*>), Applicative, pure, (<|>)) | |
import Data.Functor.Identity (runIdentity) | |
import Debug.Trace | |
sinkFork :: (Applicative m, Monad m) => Sink a m b -> Sink a m c -> Sink a m (b, c) | |
sinkFork (SinkM mlsink) rsink = SinkM $ do | |
lsink <- mlsink | |
return $ sinkFork lsink rsink | |
sinkFork lsink (SinkM mrsink) = SinkM $ do | |
rsink <- mrsink | |
return $ sinkFork lsink rsink | |
sinkFork (Processing lpush lclose) (Processing rpush rclose) = Processing | |
(\i -> sinkFork (lpush i) (rpush i)) | |
((,) <$> lclose <*> rclose) | |
sinkFork (Processing lpush lclose) (Done _ rres) = Processing | |
(\i -> sinkFork (lpush i) (Done Nothing rres)) | |
((,) <$> lclose <*> pure rres) | |
sinkFork (Done _ lres) (Processing rpush rclose) = Processing | |
(\i -> sinkFork (Done Nothing lres) (rpush i)) | |
((,) lres <$> rclose) | |
sinkFork (Done lleft lres) (Done rleft rres) = Done | |
(lleft <|> rleft) | |
(lres, rres) | |
-- stops as soon as a sink is done | |
sinkFork' :: (Applicative m, Monad m) => Sink a m b -> Sink a m c -> Sink a m (b, c) | |
sinkFork' (SinkM mlsink) rsink = SinkM $ do | |
lsink <- mlsink | |
return $ sinkFork' lsink rsink | |
sinkFork' lsink (SinkM mrsink) = SinkM $ do | |
rsink <- mrsink | |
return $ sinkFork' lsink rsink | |
sinkFork' (Processing lpush lclose) (Processing rpush rclose) = Processing | |
(\i -> sinkFork' (lpush i) (rpush i)) | |
((,) <$> lclose <*> rclose) | |
sinkFork' (Processing lpush lclose) (Done _ rres) = SinkM $ do | |
a <- lclose | |
return $ sinkFork' (Done Nothing a) (Done Nothing rres) | |
sinkFork' (Done _ lres) (Processing rpush rclose) = SinkM $ do | |
a <- rclose | |
return $ sinkFork' (Done Nothing lres) (Done Nothing a) | |
sinkFork' (Done lleft lres) (Done rleft rres) = Done | |
Nothing -- (lleft <|> rleft) | |
(lres, rres) | |
sinkM (SinkM _) = True | |
sinkM _ = False | |
processing (Processing _ _) = True | |
processing _ = False | |
done (Done _ _) = True | |
done _ = False | |
sinky (SinkM sink) = sink | |
sinky sink = return sink | |
-- processes a list of sinks and stops as soon as one is done | |
sinkForks :: (Applicative m, Monad m) => [Sink a m b] -> Sink a m [b] | |
sinkForks sinks | any sinkM sinks = | |
SinkM $ sinkForks <$> mapM sinky sinks | |
sinkForks sinks | all processing sinks = Processing | |
(\i -> sinkForks (map (\(Processing push close) -> push i) sinks)) | |
(mapM (\(Processing push close) -> close) sinks) | |
sinkForks sinks | all done sinks = Done Nothing (map (\(Done _ ret) -> ret) sinks) | |
sinkForks sinks | any done sinks = SinkM $ do | |
-- sinkClose should work but instead get could not deduce (b ~ ()) ... | |
newsinks::[b] <- mapM (\x -> case x of | |
Processing push close -> close | |
Done _ ret -> return ret | |
) sinks | |
return $ sinkForks $ map (Done Nothing) newsinks | |
main :: IO () | |
main = do | |
a <- CL.sourceList [1..10] | |
$$ sinkFork | |
(CL.fold (+) (0 :: Int)) | |
(CL.take 5) | |
print a | |
b <- CL.sourceList [1..10] | |
$$ sinkFork' | |
(CL.fold (+) (0 :: Int)) | |
(CL.take 5) | |
print b | |
c <- CL.sourceList [1..10] | |
$$ sinkForks | |
[(CL.take 0) | |
,(CL.take 5)] | |
print c |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment