Skip to content

Instantly share code, notes, and snippets.

@thelff
Created March 26, 2012 02:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thelff/2202469 to your computer and use it in GitHub Desktop.
Save thelff/2202469 to your computer and use it in GitHub Desktop.
sinkfork' and sinkforks
-- CL.sourceList [1..10] $$ sinkFork' (CL.consume) (CL.isolate 0 =$ CL.consume)
-- ([1],[])
{-# LANGUAGE ScopedTypeVariables #-}
import Data.Conduit
import qualified Data.Conduit.List as CL
import Control.Applicative ((<$>), (<*>), Applicative, pure, (<|>))
import Data.Functor.Identity (runIdentity)
import Debug.Trace
sinkFork :: (Applicative m, Monad m) => Sink a m b -> Sink a m c -> Sink a m (b, c)
sinkFork (SinkM mlsink) rsink = SinkM $ do
lsink <- mlsink
return $ sinkFork lsink rsink
sinkFork lsink (SinkM mrsink) = SinkM $ do
rsink <- mrsink
return $ sinkFork lsink rsink
sinkFork (Processing lpush lclose) (Processing rpush rclose) = Processing
(\i -> sinkFork (lpush i) (rpush i))
((,) <$> lclose <*> rclose)
sinkFork (Processing lpush lclose) (Done _ rres) = Processing
(\i -> sinkFork (lpush i) (Done Nothing rres))
((,) <$> lclose <*> pure rres)
sinkFork (Done _ lres) (Processing rpush rclose) = Processing
(\i -> sinkFork (Done Nothing lres) (rpush i))
((,) lres <$> rclose)
sinkFork (Done lleft lres) (Done rleft rres) = Done
(lleft <|> rleft)
(lres, rres)
-- stops as soon as a sink is done
sinkFork' :: (Applicative m, Monad m) => Sink a m b -> Sink a m c -> Sink a m (b, c)
sinkFork' (SinkM mlsink) rsink = SinkM $ do
lsink <- mlsink
return $ sinkFork' lsink rsink
sinkFork' lsink (SinkM mrsink) = SinkM $ do
rsink <- mrsink
return $ sinkFork' lsink rsink
sinkFork' (Processing lpush lclose) (Processing rpush rclose) = Processing
(\i -> sinkFork' (lpush i) (rpush i))
((,) <$> lclose <*> rclose)
sinkFork' (Processing lpush lclose) (Done _ rres) = SinkM $ do
a <- lclose
return $ sinkFork' (Done Nothing a) (Done Nothing rres)
sinkFork' (Done _ lres) (Processing rpush rclose) = SinkM $ do
a <- rclose
return $ sinkFork' (Done Nothing lres) (Done Nothing a)
sinkFork' (Done lleft lres) (Done rleft rres) = Done
Nothing -- (lleft <|> rleft)
(lres, rres)
sinkM (SinkM _) = True
sinkM _ = False
processing (Processing _ _) = True
processing _ = False
done (Done _ _) = True
done _ = False
sinky (SinkM sink) = sink
sinky sink = return sink
-- processes a list of sinks and stops as soon as one is done
sinkForks :: (Applicative m, Monad m) => [Sink a m b] -> Sink a m [b]
sinkForks sinks | any sinkM sinks =
SinkM $ sinkForks <$> mapM sinky sinks
sinkForks sinks | all processing sinks = Processing
(\i -> sinkForks (map (\(Processing push close) -> push i) sinks))
(mapM (\(Processing push close) -> close) sinks)
sinkForks sinks | all done sinks = Done Nothing (map (\(Done _ ret) -> ret) sinks)
sinkForks sinks | any done sinks = SinkM $ do
-- sinkClose should work but instead get could not deduce (b ~ ()) ...
newsinks::[b] <- mapM (\x -> case x of
Processing push close -> close
Done _ ret -> return ret
) sinks
return $ sinkForks $ map (Done Nothing) newsinks
main :: IO ()
main = do
a <- CL.sourceList [1..10]
$$ sinkFork
(CL.fold (+) (0 :: Int))
(CL.take 5)
print a
b <- CL.sourceList [1..10]
$$ sinkFork'
(CL.fold (+) (0 :: Int))
(CL.take 5)
print b
c <- CL.sourceList [1..10]
$$ sinkForks
[(CL.take 0)
,(CL.take 5)]
print c
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment