Is this what pipes does?
{-# LANGUAGE EmptyCase #-}
import Data.Void
import Control.Applicative
import Control.Monad.Trans
import Control.Monad
import Control.Monad.Writer
import System.IO
import qualified Data.ByteString as B
data Pipeline i o m a
= Done a
| Yield o (m (Pipeline i o m a))
| Await (i -> m (Pipeline i o m a))
| Eff (m (Pipeline i o m a))
type Producer o m a = Pipeline () o m a
type Consumer i m a = Pipeline i Void m a
instance (Functor m, Monad m) => Functor (Pipeline i o m) where
fmap f (Done a) = Done (f a)
fmap f (Yield o m) = Yield o $ do
next <- m
return $ fmap f next
fmap f (Await c) = Await $ \i -> fmap f <$> c i
fmap f (Eff m) = Eff $ fmap f <$> m
instance (Applicative m, Monad m) => Applicative (Pipeline i o m) where
pure = Done
(<*>) = ap
instance (Functor m, Monad m) => Monad (Pipeline i o m) where
return = Done
(Done m) >>= f = f m
(Yield o m) >>= f = Yield o $ (>>= f) <$> m
(Await c) >>= f = Await $ \i -> (>>=f) <$> c i
(Eff m) >>= f = Eff $ do
p <- m
return $ p >>= f
instance MonadTrans (Pipeline i o) where
lift m = Eff $ do
x <- m
return $ Done x
instance (MonadIO m, Functor m) => MonadIO (Pipeline i o m) where
liftIO = lift . liftIO
yield :: (Monad m, Functor m) => o -> Pipeline i o m ()
yield x = Yield x (return $ return ())
await :: (Monad m, Functor m) => Pipeline i o m i
await = Await (return . return)
(=$=) :: (Monad m, Applicative m) => Pipeline i o m () -> Pipeline (Maybe o) p m b -> Pipeline i p m b
(Done _) =$= (Done b) = Done b
(Done a) =$= (Yield o n) = Yield o $ do
p <- n
return $ Done a =$= p
(Done a) =$= (Eff m) = Eff $ do
p <- m
return $ Done a =$= p
(Done a) =$= (Await f) = Eff $ do
p <- f Nothing
return $ Done a =$= p
(Yield o c) =$= (Await f) = Eff $ do
c' <- c
p' <- f (Just o)
return $ c' =$= p'
(Yield _ _) =$= (Done a) = Done a
(Yield o c) =$= (Eff m) = Eff $ do
p <- m
return $ Yield o c =$= p
(Await c) =$= f = Await $ \i -> do
n <- c i
return $ n =$= f
a =$= (Yield o n') = Yield o $ do
p <- n'
return $ a =$= p
-- run the consumer as far as possible before requesting new values
-- from the producer
a =$= (Eff n) = Eff $ (=$=) <$> pure a <*> n
(Eff m) =$= n = Eff $ (=$=) <$> m <*> pure n
runPipeline :: (Monad m) => Pipeline () Void m a -> m a
runPipeline (Done a) = return a
runPipeline (Yield o _) = case o of { }
runPipeline (Await f) = f () >>= runPipeline
runPipeline (Eff m) = m >>= runPipeline
-- utilities
mapPipeline :: (Functor m, Monad m) => (a -> b) -> Pipeline (Maybe a) b m ()
mapPipeline f = do
x <- await
case x of
Just v -> yield (f v) >> mapPipeline f
Nothing -> return ()
collect :: (Monad m) => m (Maybe a) -> m [a]
collect m = do
v <- m
case v of
(Just x) -> collect m >>= (\rest -> return $ x:rest)
Nothing -> return []
produce :: (Functor m, Monad m) => [a] -> Producer a m ()
produce [] = return ()
produce (x:xs) = yield x >> produce xs
-- Demo:
foo :: IO [Integer]
foo = runPipeline $ producer =$= mapPipeline (+1) =$= collect await
where producer :: (MonadIO m, Functor m) => Producer Integer m ()
producer = do
yield 1
yield 2
liftIO $ putStr "How many times should I yield? "
count <- liftIO readLn
replicateM_ count $ yield 10101
yield 3
bar :: [String]
bar = snd $ runWriter $ runPipeline $ producer =$= consumer
where producer :: Producer String (Writer [String]) ()
producer = do
lift $ tell ["producer: going to yield a"]
yield "a"
lift $ tell ["producer: yielded a"]
lift $ tell ["producer: some stuff after yielding a"]
consumer :: Consumer (Maybe String) (Writer [String]) ()
consumer = do
(Just x) <- await
lift $ tell ["consumer: got " ++ x]
lift $ tell ["consumer: doing some stuff"]
-- File IO
cat :: IO ()
cat = runPipeline $ handleToProducer stdin =$= handleToConsumer stdout
handleToProducer :: (MonadIO m, Functor m) => Handle -> Producer B.ByteString m ()
handleToProducer handle = do
chunk <- liftIO $ B.hGetSome handle 1024
if chunk == B.empty
then return ()
else yield chunk >> handleToProducer handle
handleToConsumer :: (MonadIO m, Functor m) => Handle -> Consumer (Maybe B.ByteString) m ()
handleToConsumer handle = do
c <- await
case c of
(Just chunk) -> liftIO (B.hPut handle chunk) >> handleToConsumer handle
Nothing -> return ()
