Create a gist now

Instantly share code, notes, and snippets.

Understanding the Pipes library

In response to this reddit post about failing to understand the Pipes library after a couple of hours. I wanted to show how an experienced haskeller does it, but I'm afraid it also took me quite a few hours, which is why the following list of steps goes on and on.

After all those hours, my opinion is that Pipes is not at all an easy library. I don't know if Conduit is any easier, but otherwise I side with your friend in advising to use something else (perhaps ordinary lazy IO?) instead of Pipes.

Anyway, here is the full brain dump of my steps as I tried to get your three snippets to fit together.

  • Since you say that you have a hard time combining the snippets, I assume that they must have complicated types. So my first goal is to figure out the type of your first snippet.
  • hoogle for parseUrl, withManager, etc. No results.
  • Google for haskell runEffect, find that it's a method from Pipes.Core, a module of the pipes package
  • cabal install pipes
  • Write a small program importing Pipes.Core giving the name getHttpStream to your first snippet. parseUrl is not in scope, among others.
  • At this point I guess that withManager, withHTTP, etc. are probably members of other packages in the Pipes ecosystem.
  • Google for withHTTP pipes, discover that I need Pipes.HTTP from the pipes-http package.
  • cabal install pipes-http
  • Now (>->) is not in scope. I thought it was monad composition, so I imported Control.Monad, and when it didn't work I looked up Control.Monad on hoogle and discovered that I was confusing (>->) with (>=>).
  • hoogle for (>->). Probably another piece of the Pipes ecosystem.
  • Google for ">-> pipes", but of course google is no good for identifiers which consists only of symbols.
  • I remember that hayoo is supposed to support more packages than hoogle, so I search it for "(>->)", but the results don't seem related to my query.
  • I guess that (>->) is probably pipe composition, so it's probably in the core pipes package, just not in Pipes.Core. I quickly find it in Pipes.
  • Next, PB.stdout is not in scope. At that point I notice that the parseUrl snippet is actually at the top of the Pipes.HTTP documentation, from which I discover that PB is Pipes.ByteString.
  • cabal install pipes-bytestring
  • The first snippet finally compiles! I can finally ask ghci about its type.

    $ cabal repl
    > :load Main.hs
    > :t getHttpStream
    getHttpStream :: IO ()
  • Okay, so the type isn't as complicated at all. What happens when I run this?

      > getHttpStream
  • Okay, so it fetches a webpage and prints it.

  • What do we want to do next? Turning the binary stream into text and splitting the text into lines? Clearly, since the first snippet has type IO () and not IO ByteString, we're not going to be able to do anything to the binary stream. So my next goal is to modify the first snippet so that it has type IO ByteString.
  • I add the type annotation getHttpStream :: IO PB.ByteString and look at the error messages. withManager returns the wrong type, but I can't tell if it's because it is a polymorphic function and its body has the wrong type, or if it's because it always returns IO ().
  • Looking for withManager in the documentation for Pipes.Core and Pipes. It's not there.
  • No matter, I can just ask ghci.

      :t withManager
      withManager :: ManagerSettings -> (Manager -> IO a) -> IO a
  • Great, it's the body which has the wrong type. Same question for withHTTP:

      :t withHTTP
        :: Request
           -> Manager
           -> (Response (Producer PB.ByteString IO ()) -> IO a)
           -> IO a
  • Okay, we're getting close: the body receives a something-something-ByteString and returns an IO a, I just need to somehow extract the ByteString from the something-something-ByteString and return that instead of (). Still, while we're at it, what's the type of runEffect?

      :t runEffect
      runEffect :: Monad m => Effect m r -> m r
  • No idea what this Effect thing is, but m r is definitely becoming IO (), so I should probably try to modify the body which is being passed to runEffect so that it has type Effect IO ByteString instead of Effect IO (). I think this is more likely to work than to try to extract a ByteString from the something-something-ByteString, because the fact that runEffect exists and is used in this example is a sign that it's probably not possible to access the ByteString without it.

  • At this point, I predict that responseBody resp probably has type Effect IO ByteString, that PB.stdout probably has type Effect IO (), and that >-> feeds one into the other. Except this can't really work, because the type of PB.stdout would have to specify that it expects a ByteString somehow. Maybe ByteString -> Effect IO ()? But in that case, >-> would be a strange choice of name, since it looks closer to (>=>) than to (>>=). Anyway, let's look at the types of those three pieces and figure out what's going on:

      :t responseBody (undefined :: Response (Producer PB.ByteString IO ()))
      responseBody (undefined :: Response (Producer PB.ByteString IO ()))
        :: Producer PB.ByteString IO ()
      :t (>->)
        :: Monad m =>
           Pipes.Core.Proxy a' a () b m r
           -> Pipes.Core.Proxy () b c' c m r -> Pipes.Core.Proxy a' a c' c m r
      :t PB.stdout
        :: MonadIO m => Pipes.Core.Proxy () PB.ByteString y' y m ()
  • Wow, those are some pretty long types, and Effect m r is nowhere to be found. Maybe Effect is a type synonym?

      :info Effect
      type Effect = Pipes.Core.Proxy X () () X
              -- Defined in `Pipes.Core'
  • Yup, it's a type synonym, and I bet Producer is one too:

      :info Producer
      type Producer b = Pipes.Core.Proxy X () () b
              -- Defined in `Pipes.Core'
  • Okay, this is a bit tricky, because there are so many type variables. Let's see. We want an Effect IO ByteString, which desugars toProxy X () () X IO ByteString, and we have a Producer ByteString IO (), which desugars to Proxy X () () ByteString IO (). I don't know what those mysterious Xs are for, but clearly two of those type variables must be input and output types. We know that (>->) pipes the output of responseBody resp into the input of PB.stdout, and we know that the intermediate type is ByteString, so looking at the position where ByteString occur in the type of those two expressions, I conclude that Proxy's second type variable is the input type, and the fourth type variable is the output type.

  • The last two type variables m and r are clearly the monad under which the Proxy objects execute the computations they represent, and the type of the value returned by the entire pipeline once it has completed.
  • I think I need a value of type Proxy X ByteString () X IO ByteString. This would be a computation which consumes a ByteString as input and, while it wouldn't produce any output which could be fed to another Proxy, it would cause the pipeline to terminate with that ByteString as a final result. If only hoogle supported Pipes, then I could just search for this type...
  • Double-checking the type of (>->), I don't think this could work after all, because (>->) doesn't allow the r to be changed, and the Response object forces r to be (). Maybe we are supposed to do everything inside the Pipes ecosystem, without ever returning values to the IO level? I guess I should read the documentation.
  • Reading the few lines of hackage documentation for the Proxy type.
  • Oooh, nice! Having input and output types for both upstream and downstream makes a lot of sense to me, because I came up with a similar design a few months ago. I wonder if Gabriel is also using a swapping semantics instead of independent awaitUp and yieldUp commands?
  • Reading further down, the (~>) and (>~) operators catch my eye, as they allow the result type r to be changed.
  • Okay, I think I've got it. The type of await is Consumer' a m a, which expands to Proxy () a y' y m a, which almost matches the Proxy X ByteString () X IO ByteString I am looking for. The first type variable is a () instead of an X, but I'm not even sure why I need an X in the first place, I just copied it from the signature of Effect.
  • Now that I see in the documentation that this X stands for V.Void, I am doubly certain that I got it. In my swapping scheme, Void was used to indicate the ends of the pipeline, and () was used when I wanted a swap to be interpreted as a unidirectional transfer: if one side offers a () in exchange for a value, the end result is that a value was transmitted to that side. Since I want to receive the ByteString from the Response object, I definitely want a () here.
  • The only nagging detail left is... do I want (~>) or (>~)? Their documentation helpfully point out that I can combine a Producer with an Effect, an Effect with a Consumer, etc., but they don't mention my case, combining Producer with a Consumer'.
  • Okay, I probably don't want (~>), as it is composing functions instead of Proxys.
  • Whatever, with all of those type synonym and with the polymorphism of await (It's a Consumer', not a Consumer), there is probably a case which covers the composition I want. Let's try it!
  • Changing >-> PB.stdout to >~ await.

      Couldn't match type `PB.ByteString' with `X'
          Expected type: Response (Pipes.Core.Proxy X () () X IO ())
            Actual type: Response (Producer PB.ByteString IO ())
      In the first argument of `responseBody', namely `resp'
  • Er, that's not the part of the code which I expected to cause a type error.

  • Adding the type annotations (responseBody resp :: Producer PB.ByteString IO ()) and (await :: Consumer' PB.ByteString IO PB.ByteString).

      Couldn't match type `PB.ByteString' with `X'
      Expected type: Pipes.Core.Proxy X () () X IO ()
        Actual type: Producer PB.ByteString IO ()
      In the first argument of `(>~)'
      Couldn't match type `PB.ByteString' with `()'
      Expected type: Pipes.Core.Proxy () () () X IO PB.ByteString
        Actual type: Pipes.Core.Proxy
                       () PB.ByteString () X IO PB.ByteString
      In the second argument of `(>~)'
  • Okay, now I need to figure out whether this means I need to tweak things a bit more, or if I am going in the wrong direction altogether.

  • Expanding the Producer type synonyms in order to better understand the type error.

      Couldn't match type `PB.ByteString' with `X'
      Expected type: Proxy X () () X             IO ()
        Actual type: Proxy X () () PB.ByteString IO ()
      In the first argument of `(>~)'
      Couldn't match type `PB.ByteString' with `()'
      Expected type: Proxy () ()            () X IO PB.ByteString
        Actual type: Proxy () PB.ByteString () X IO PB.ByteString
      In the second argument of `(>~)'
  • This error message is telling me that (>~) isn't doing what I want at all. It doesn't want the first argument to send its ByteString down the pipeline, and it doesn't want the second argument to receive it!

  • Re-reading the documentation for (>~).
  • "loops over p replacing each await with draw"? Well, the right-hand side consists of exactly one await, so I guess what I did was the same as if I had removed the >-> PB.stdout altogether.
  • Okay, time for a new approach. Since both computations return an r, I wonder which of the two values is used when they are composed with (>->)?
  • Adding the OverloadedStrings extension, replacing responseBody resp with fmap (const "foo") (responseBody resp), and replacing (>~) with (>->).
  • Finally, this typechecks! I have a computation of type IO PB.ByteString!
  • Running the computation yields the source of, no "foo" to be seen. Still, this can't be the canonical way to do this; what if the return type was a value which was difficult to construct? Let's try doing everything inside the Pipes system instead.
  • In particular, everything including the output should be done inside the Pipes system. Since we're going to output something other than a single ByteString, I need something more general than PB.stdout.
  • Reverting to the initial version, the one with >-> PB.stdout and type IO ().
  • Since Proxy is a monad transformer, it shouldn't be too hard to create a custom Proxy computation which receives and prints any kind of input.

      printResult :: Show a => Consumer a IO ()
      printResult = do
          x <- await
          lift $ print x
  • Replacing PB.stdout with printResult. The string representation of the source of is printed. Good!

  • Okay, now that I can plug in arbitrary IO code inside a Proxy pipeline, I feel like the rest of this exercise is going to be a walk in the park.
  • Next snippet: Pipes.Text.Encoding.decodeUtf8. I probably need to install yet another package. Based on the name of the previous packges, I guess this one is called "pipes-text"?
  • cabal install pipes-text
  • Looks good. What is the type of this new snippet?

      :t decodeUtf8
        :: Monad m =>
           Pipes.Core.Producer ByteString m r
           -> Pipes.Core.Producer Text m (Pipes.Core.Producer ByteString m r)
  • Weird, I would have expected some kind of transducer accepting a ByteString and producing a Text, not a function returning a producer of producers. Let's look at the documentation.

  • Oh my, the documentation talks about lenses now. Sounds like the Pipes ecosystem requires quite a bit of prior knowledge from its users!
  • "[...] if the first bytes are already un-decodable, the whole ByteString producer will be returned". I'm pretty sure that won't happen, so I want to discard the Producer ByteString returned by the Producer Text. That's what the fmap (const ()) you've been told to use must be doing.
  • Wait, but what happens if all bytes are decodable? Does it return a producer which doesn't produce any bytes? I would have expected to receive an Either (Producer ByteString IO r) r, not unconditionally receiving a producer. Maybe in case of success, decodeUtf8 doesn't return at all?
  • Let's test this.

      asUtf8 :: Producer PB.ByteString IO ()
             -> Producer Text          IO ()
      asUtf8 p = do
          _ <- decodeUtf8 p
          lift $ putStrLn "decodeUtf8 has returned"
  • Replacing responseBody resp with asUtf8 (responseBody resp). The string "decodeUtf8" is not printed.

  • The monadic semantics of Proxy seem really weird. I knew it was possible for monads such as Maybe to abort in case of failure, but now the computation is aborting in case of success? What?
  • Maybe the upstream computation doesn't execute until the end in the case where the downstream computation doesn't force it to continue evaluating (pull model). Let's test this hypothesis:

      produce3 :: Producer Int IO ()
      produce3 = do
          lift $ putStrLn "producing 1."
          yield 1
          lift $ putStrLn "producing 2."
          yield 2
          lift $ putStrLn "producing 3."
          yield 3
          lift $ putStrLn "done producing."
      consume2 :: Consumer Int IO ()
      consume2 = do
          lift $ putStrLn "consuming once:"
          x <- await
          lift $ printf "received %d. consuming twice:\n" x
          y <- await
          lift $ printf "received %d. done consuming.\n" y
      -- |
      -- >>> main
      -- consuming once:
      -- producing 1.
      -- received 1. consuming twice:
      -- producing 2.
      -- received 2. done consuming.
      main :: IO ()
      main = runEffect (produce3 >-> consume2)
  • Youhoo! Not only was my hypothesis correct, but this compiled the first time. I'm getting the hang of the Pipes library!

  • Alright, last snippet: Pipes.Text.lines. What is its type?

      :t Pipes.Text.lines
        :: (Monad m, Functor f, Profunctor p) =>
           p (FreeT (Pipes.Core.Producer Text m) m r) (f (FreeT
                                                            (Pipes.Core.Producer Text m) m r))
           -> p (Pipes.Core.Producer Text m r) (f (Pipes.Core.Producer
                                                     Text m r))
  • Free monads and profunctors?! Just as I thought the Pipes library wasn't that hard...

  • I think I know what FreeT and Profunctor do, but let's just read the documentation, in case they already simplify the type for common cases.
  • Okay, looks like I just need to apply view to Pipes.Text.Lines in order to obtain what I hope is a Pipe. But where is view defined?
  • The example imports Lens.Family, which is probably from one of the lens packages. But which one?
  • google "Lens.Family", discover that one of the lens packages is called "lens-family". Probably this one :)
  • cabal install lens-family

      :t view Pipes.Text.lines
      view Pipes.Text.lines
        :: Monad m =>
           Pipes.Core.Producer Text m r
           -> FreeT (Pipes.Core.Producer Text m) m r
  • I don't understand why all those Pipes primitives are functions which return producers and stuff. In order to be composed inside a Proxy pipeline, shouldn't almost everything be a Proxy something a something b m r instead of an a -> something Producer b?

  • Okay, how do I convert this FreeT into a [Text]? I know that a free monad is a tree obtained by nesting occurrences of a given functor, which in this case is Producer Text m, I guess. So we have a tree of producers, wow.
  • Oh, wait, but this isn't just a free monad, it is a free monad transformer. So, in addition to the abilities of the inner monad, it can also do, er... what's the computational interpretation of a free monad?
  • Maybe it would be easier to figure out the container interpretation of a monad transformer. Maybe just container composition? runFreeT has type FreeT f m a -> m (FreeF f a (FreeT f m a)), so it's an m-shaped container of FreeF something values, which are themselves f-shaped containers of FreeT values, which is what we're trying to understand in the first place. I think it just means that the m and f alternate: FreeT f m a ~ m (f (m (f ... a))).
  • Okay, so in our case the f is a producer, and the m is IO. So I should be able to run an IO action which returns a producer, produce its values until it returns another IO action, from which I can obtain the next producer, and so on.
  • Let's write a utility function to do just that:

      free2list :: Monad m
                => FreeT (Producer b m) m r
                -> Producer b m r
      free2list freeProducers = do
          freeProducer <- lift $ runFreeT freeProducers
          case freeProducer of
            Pure r -> return r
            Free p -> do nextProducers <- p
                         free2list nextProducers
  • I must admit that it took me a while to get the above to compile. In the end, it is replacing the abstract m, b and r variables with concrete types which made the error messages much easier to understand, and allowed me to complete the implementation.

  • Replacing asUtf8 (responseBody resp) with free2list (view lines (asUtf8 (responseBody resp))). Now the output is only the first line of, which probably means that my line splitting is okay, but my pretty printing is not, because printResult stops after the first result.
  • How should printResult know how many lines to fetch? What happens if it fetches too much? Let's find out.

      printAllResults :: Show a => Int -> Consumer a IO ()
      printAllResults i = do
          x <- await
          lift $ printf "%d %s\n" i (show x)
          printAllResults (i + 1)
  • When too many lines are being fetched, decodeUtf8 does return, even though the text didn't contain any invalid byte sequences.

      48 [...]
      49 "</body>"
      50 "</html>"
      decodeUtf8 has returned
  • I still don't know how to distinguish between decodeUtf8 returning a producer because there was an error, and returning a producer because all the bytes were successfully parsed. In case of success, I guess the producer will not produce any more bytes, but how do we check this? So far, the only thing we have done with producers is to call them, causing them to produce, but not to return their values. What happens if we try to await a value from the producer inside the monadic code?

      asUtf8 :: Producer PB.ByteString IO ()
             -> Producer Text          IO ()
      asUtf8 p = do
          p' <- decodeUtf8 p
          p' >-> do _ <- await
                    lift $ putStrLn "decodeUtf8 couldn't parse the text"
  • I run the resulting producer, and I try to await a value from it. If I succeed, it means it was non-empty, so there was an error. It works!

Here is the full code:

module Main where

import Prelude hiding (lines)
import Lens.Family
import Pipes
import Pipes.HTTP
import Pipes.Text
import Pipes.Text.Encoding
import qualified Pipes.ByteString as PB

getHttpStream :: IO ()
getHttpStream = do
  req <- parseUrl ""
  withManager tlsManagerSettings $ \m ->
    withHTTP req m $ \resp -> do
        runEffect $ free2list (view lines (asUtf8 (responseBody resp)))
                >-> printAllResults 1

asUtf8 :: Producer PB.ByteString IO ()
       -> Producer Text          IO ()
asUtf8 p = do
    p' <- decodeUtf8 p
    p' >-> do _ <- await
              lift $ putStrLn "decodeUtf8 couldn't parse the text"

free2list :: Monad m
          => FreeT (Producer b m) m r
          -> Producer b m r
free2list freeProducers = do
    freeProducer <- lift $ runFreeT freeProducers
    case freeProducer of
      Pure r -> return r
      Free p -> do nextProducers <- p
                   free2list nextProducers

printResult :: Show a => Consumer a IO ()
printResult = do
    x <- await
    lift $ print x

printAllResults :: Show a => Int -> Consumer a IO ()
printAllResults i = do
    x <- await
    lift $ putStrLn $ show i ++ " " ++ show x
    printAllResults (i + 1)

main :: IO ()
main = getHttpStream >>= print

Can you explain what your program was supposed to do?


This is awesome detective work. It took me much longer to figure out Gabriel's way of thinking! One missing ingredient that would have made the task easier is pipes-group which has an excellent tutorial. It defines free2list as concats. There is one defect in the program you end up with has to do with just this; here is a simple variant, with slightly different use of combinators and a comment explaining:


I should say, I took you to be wanting to prefix each line with its line number.

gelisam commented Jun 16, 2014

My goal was to combine the OP's snippets together, with no particular concern for the output, but yes, the intent of the full code was to prefix each line with its line number. Thanks for fixing and explaining my defect!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment