public
Created

Error conscious, pure iteratee library (based on pipes)

  • Download Gist
main.hs
Haskell
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
{-# LANGUAGE FlexibleInstances, DeriveDataTypeable #-}
 
import Data.Char
import Data.Typeable
 
import Control.Monad
import Control.Exception
import Control.Monad.Trans
 
import System.IO
 
-- The result of a push or pull operation
data Result i e =
Err e |
Success i |
EOF
 
data Step i o e m a =
Done a |
Pull (Result i e -> Pipe i o e m a) |
Push o (Result () e -> Pipe i o e m a) |
Error e
 
-- The main pipe type
data Pipe i o e m a = Pipe {
runPipe :: m (Step i o e m a)
}
 
instance Monad m => Monad (Pipe i o e m) where
return = Pipe . return . Done
m >>= f = Pipe $ do
step <- runPipe m
case step of
Done x -> runPipe $ f x
Error e -> runPipe $ pipeErr e
Pull c -> return $ Pull (\i -> c i >>= f)
Push o c -> return $ Push o (\i -> c i >>= f)
 
instance MonadTrans (Pipe i o e) where
lift m = Pipe $ do
v <- m
return $ Done v
 
instance MonadIO m => MonadIO (Pipe i o SomeException m) where
liftIO m = Pipe $ do
v <- liftIO $ try m
case v of
Right v' -> return $ Done v'
Left e -> runPipe $ pipeErr e
 
data EOFError = EOFError deriving (Show, Typeable)
 
instance Exception EOFError
 
-- pull a value
pull :: Monad m => Pipe i o SomeException m i
pull = do
x <- Pipe $ return $ Pull return
case x of
Err e -> pipeErr e
Success i -> return i
EOF -> pipeErr $ SomeException EOFError
 
-- push a value
push :: Monad m => o -> Pipe i o SomeException m ()
push o = do
x <- Pipe $ return $ Push o return
case x of
Err e -> pipeErr e
Success () -> return ()
EOF -> pipeErr $ SomeException EOFError
 
-- throw an error
pipeErr :: Monad m => e -> Pipe i o e m a
pipeErr e = Pipe $ return $ Error e
 
-- catch an error
pipeCatch :: Monad m => Pipe i o e m a -> (e -> Pipe i o e m a) -> Pipe i o e m a
pipeCatch pipe handler = Pipe $ do
step <- runPipe pipe
runPipe $ case step of
Done a -> return a
Error e -> handler e
Pull c -> Pipe $ return $ Pull (\i -> pipeCatch (c i) handler)
Push o c -> Pipe $ return $ Push o (\i -> pipeCatch (c i) handler)
 
-- create big pipes from small pipes
($>) :: Monad m => Pipe i x e m a -> Pipe x o e m b -> Pipe i o e m (a, b)
a $> b = Pipe $ do
a' <- runPipe a
b' <- runPipe b
runPipe $ case (a', b') of
(_, Push o c) -> do
x <- Pipe $ return $ Push o return
(Pipe $ return a') $> (c x)
(Error e, Error _) ->
pipeErr e
(Error e, Done _) ->
pipeErr e
(Error e, Pull c) ->
(pipeErr e) $> (c (Err e))
(Pull c, Error e) ->
(c (Err e)) $> (pipeErr e)
(Push _ c, Error e) ->
(c (Err e)) $> (pipeErr e)
(Done _, Error e) ->
pipeErr e
(Push o c, Pull c') ->
(c (Success ())) $> (c' (Success o))
(Pull c, _) -> do
x <- Pipe $ return $ Pull return
(c x) $> (Pipe $ return b')
(Done _, Pull c) ->
(Pipe $ return a') $> (c EOF)
(Push _ c, Done _) ->
(c EOF) $> (Pipe $ return b')
(Done a, Done b) ->
Pipe $ return $ Done (a, b)
 
-- enumerate the characters in a file
enumFile :: String -> Pipe () Char SomeException IO ()
enumFile path = do
fd <- liftIO $ openFile path ReadMode
(forever $ do
c <- liftIO $ hGetChar fd
push c) `pipeCatch` (\e -> (liftIO $ hClose fd) >> pipeErr e)
 
-- dump a stream of characters into a file
iterFile :: String -> Pipe Char () SomeException IO ()
iterFile path = do
fd <- liftIO $ openFile path WriteMode
(forever $ do
c <- pull
liftIO $ hPutChar fd c) `pipeCatch` (\e -> (liftIO $ hClose fd) >> pipeErr e)
 
-- take a complete pipeline and run it
runPipeline :: Monad m => Pipe () () e m a -> m (Either e a)
runPipeline pipe = do
step <- runPipe pipe
case step of
Done x -> return $ Right x
Error e -> return $ Left e
Pull _ -> error "unexpected pull"
Push _ _ -> error "unexpected push"
 
-- lift a pure function into a pipe
createPipe :: Monad m => (i -> o) -> Pipe i o SomeException m a
createPipe fn =
forever $ pull >>= (push . fn)
 
-- read the characters from src, capitalize them, then dump them into dst
main = runPipeline ((enumFile "src") $> (createPipe toUpper) $> (iterFile "dst"))

Personally I would think that "SomeException" would be better than "IOError" on line 43. Also, why's the error type a "Maybe" there?

That's a sensible suggestion; I think I'll do that. It's a maybe type because of a little hack to handle EOF.

Done. Yes, I think that's much better.

hGetChar will be a really, really slow way to get data out of a file (similar objection to hPutChar). How would your example change if chunks of chars were read/written instead of single chars?

@dagit, This is obviously just an example. In the real world I'd use Text or ByteString. I'd read the file in something like 4kB chunks, transform the data in bulk and write it in bulk too. I did consider adding the ability to "unpull" so that you could pull part of a ByteString for example, but I haven't made a firm decision about it yet.

@DanielWaterworth, I think making this example a bit more "real-world" would help people assess how good this solution is. For example, if you changed it to read in chunks but you wanted to process per char how much impact would that have on the surrounding code. Can you get away with only changing enumFile? What happens if I don't fully process a chunk? Can I "rechunk" the data? (eg., go from 4kb chunks to lines and maintain composability, iteratees try to make that possible)

Edit: fixed some spelling errors

For communicating chunks between coroutines, have a look at the incremental-parser package. It solves the common problem with the chunks, namely that the producer's and the consumer's ideas of chunk boundaries rarely agree. The consumer can specify the grammar of the chunk it's interested in, so there's no need to "unpull" anything.

@dagit, I could certainly demonstrate more of what's possible, but I don't want to over complicate the example. It is possible to create a pipe that rechunks data, ie pulls arbitrary chunks and pushes lines.

@blamario, yes, but then you have to consistently use the incremental-parser package.

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.