Skip to content

Instantly share code, notes, and snippets.

@ssanj
Created April 20, 2020 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ssanj/61f7b8e891083f98c0c33da012edb089 to your computer and use it in GitHub Desktop.
Save ssanj/61f7b8e891083f98c0c33da012edb089 to your computer and use it in GitHub Desktop.

What is a Pull, taken from a Gitter thread.

All comments are by Fabio Labella @SystemFw unless specified.

[1] https://gitter.im/functional-streams-for-scala/fs2?at=5acb7b81109bb04332a39a41

What is a Pull

Right, so, I think a nice way to thinking of effects is as languages by effects here I mean F[_], like Stream, Pull, IO, but also Option, List or what have you hese languages have some words that are unique to them (e.g. only Stream has uncons), and some that are abstracted in algebras (e.g.flatMap, since they are all monads)

so, what's a Pull the first thing is that fs2 is a pull based streaming library and every pull based streaming library, even those with complex implementations like fs2, can roughly be understood like advanced iterators

where you basically need to tell it "give me the next one" to make things happen

so let's look at what Pull can do Pull[F[_], O, R] has 3 params: F[_] effect O values emitted R value returned

so Pull is a computation that describes emitting 0...n values, while evaluating effects in F, and complete with a value of type R

so, forgetting about Stream for a second most people understand F and O in Pull very easily but R is more confusing what is that for?

so, a Pull can emit some elements and then terminate with a result

that result can be used to mean "the rest" so imagine a function with an accumulator for Lists you do some stuff, then recur with the rest of the list and in the end return the final List and indeed that was the attempt by OP however is a bit different when streaming you don't want to build up the whole result as soon as something needs emission you want to describe the action of emitting it but you still want the property of "the rest of the stuff", to be able to write recursive functions

so, let's see how to translate these concepts to code small simplification, I'm going to ignore Segment for now, and talk about uncons1/output1 and not about uncons/output this is Pull.output:

object Pull {
   ....
   def output1[F[_], O](o: O): Pull[F, O, Unit]
}

output1 takes an element, and gives a Pull that describes the concept of emitting that one value the return type is Unit there how do we return something instead? (so, terminate with a value) Pull forms a Monad in R

object Pull {
 ....
def pure[F[_], R](r: R): Pull[F, Nothing, R]
}

O is Nothing because nothing is emitted and flatMap takes an R => Pull now let's tie in with Stream and it will make more sense I think

First, Pull describes emission of values with effects, and Stream also describes emission of values with effects so it seems sensible that we can convert from Pull to Stream (I will get to why Pull exists in the first place, no worries) but Stream doesn't describe returning with a result so the Pull => Stream can't take any Pull but it will be a Pull[F, O, Unit] => Stream[F, O]

well, tbf it does take any Pull but discards the output

but logically you can think of it as doing map(_ => ()) on the Pull, and converting the result (which is a Pull of Unit) to Stream

Pull is only used to then convert to Stream conversely, you've got Stream

and you want to write a method that "steps through" the Stream kinda like an iterator how would you do that?

but yeah it's like saying Pull.pure(()) so it terminates with Unit, and you get the Pull[F, Nothing, Unit] you expect anyway, you want to step through a Stream we expose a method called uncons and several others (I'm going to use uncons1 for this example, but uncons is the actual one, everything else is implemented by manipulating uncons)

What should uncons1 on Stream return? well, we need it to return a data type that's capable of expressing this "stepping through" so it returns a Pull

// on Stream
def uncons1: Pull[F, Nothing, Option[(O, Stream[F, O])]]

(Gavin Bisesi) @Daenyth: So pull is also pull-based like stream but we can use it to do stateful-like actions

(Pull and Stream are actually the same thing, but exposing different apis, and I'll tell you why) yes, uncons1on aStream` returns a Pull that emits nothing

but returns an Option (is the Stream empty?), containing an element (the next element) and a Stream (the rest of the Stream)

(Gavin Bisesi) @Daenyth: what does it mean for the stream to be empty when it's pull-based

that it emits nothing terminated as you said by manipulating R and recursing

but you'll see it's the same thing because we are going to recur on Pull, and eventually hit a None, meaning we are done pulling so let's see how to write take with Pull the pattern is generally to write recursive functions with accumulators, however they look a bit different than the one on List because "the rest of the List" and "the element emitted" are two different things

the "rest of the List" is a Stream (returned from uncons)

but "the elements emitted" are already expressed by the idea of a Pull I'm going to write a simplified version of take using uncons1 (the real one uses uncons)

  def myTake[F[_], O](n: Int): Pipe[F, O, O] = in  => {
    def go(n: Int, s: Stream[F, O]): Pull[F, O, Unit] =
      if (n <= 0) Pull.done
      else s.pull.uncons1.flatMap {
        case None => Pull.done
        case Some((hd, tl)) => Pull.output1(hd) >> go(n - 1, tl)
      }

    go(n, in).stream
  }

  val yo = Stream.range(1, 20).through(myTake(5)).toVector

and it's how you normally write functions on Pull

why do we have both Pull and Stream? lots of libraries (especially haskell ones) have Stream take one type parameter for their output, and another type parameter for their result whereas here we have Stream[F, O] that doesn't have an R, and Pull and the answer is this Streams can be used for two things one is actual streaming IO (take things and transform them without accumulating too much in memory) the other is for control flow (do ten requests, transforms all responses) in a kinda FRP fashion (that's what my talk is about) now it turns out that these two use cases benefit from different monad instances for Stream (control flow), you want a Monad in O so you can say "do this on each emitted value" very conveniently whereas for Pulling, you want a Monad in R, so you can say "keep recursing" very conveniently

so whereas other libraries (e.g. haskell pipes) have a monad in the return type, and other operations are expressed differently (for, yield, unhelpfully named like Scala's for comprehension) we have two different types, exposing a monadic api that's very convenient for their specific use cases note that Pull and Streamactually have the same internal representation, they are almost like newtypes

also note that the Stream api (implemented with Pull) is rich enough that you don't need Pull all that often, but it's a useful concept to know to master the library

using scan is actually easier than recursion in the long term (for most but not all cases) the other thing is that the guide is operating on segments, and not on single elements

this is the uncons1 / uncons simplification

yeah, that' s the main reason but it's better since Segment supports fusion for all its operations , so the end result is way more performant

Stream/Pull/Segment are the three main concepts in the library, alongside the async package however note you don't interact with those three in the same ratio, I'd say more like 80/15/5

(Justin Heyes-Jones) @justinhj Is uncons1 the only primitive that lets you look at the actual O values as you walk through the stream? because uncons for example gives you a segment of O's not an actual O

yeah but that's because you should operate on Segment[O] instead of Os, if you can

since operating on Segment allows you to apply operations in batches, which results in vastly increased performance

[2] https://gitter.im/functional-streams-for-scala/fs2?at=5b757c77a37112689c2a6d13

(Calvin Lee Fernandes) @calvinlfer hey guys, I have a question about the Pull datatype, what exactly does the R type represent? Can you think of it as some kind of Resource that you can use to produce Os?

no it's the return type you can imagine it as a computation the emits some values and emits returns a new thing it's actually similar to pagination ;) which is indeed pull-based

Calvin Lee Fernandes @calvinlfer Aug 16 2018 23:33 so there's two output types? O and R?

R is not an output type, it's a return type note that to convert a Pull to a Stream R needs to Unit

i.e. a Stream doesn't have the ability to terminate with a value

so R is used to transfer some state between Pulls but its not used in Stream

Pull and Stream are two "moral" newtype over Algebra the Stream monad instance (which is on O) is better for control flow the Pull monad instance (which is on R) is better for stateful combinators (so for the actual "streaming" aspect) you can obviously write a combinator in Pull by recursing and "transferring some state", as you say and once you're done you convert to Stream that's how Stream's stateful combinators are done

look at uncons on Pull it returns Pull[F, Nothing, Option(Chunk[O], Stream[F, O])]

so uncons doesn't emit anything, but returns the first chunk of elements, and the rest of the Stream similar to list uncons then you flatMap to get that

you can Pull.output on the chunk to emit it and you have the stream to decide what to do next (recursively)

Calvin Lee Fernandes @calvinlfer Aug 16 2018 23:39 so is it right to think of R to transfer state between Pulls?

pull-based Streams are iterators on steroid yeah, in most cases that's how it's used

really focus on the types on Pull, and try writing some stuff with them note that in Haskell most streaming libraries have 3 type params one for output, one for result and the forces you to pick one as the "blessed" one for your Monad instance but different scenarios work well with difference instance as I said, an instance on O makes Stream.flatMap behave like a List, and it's great for control flow an instance on R makes it behave like an iterator, and it's great for stateful combinators and streaming fs2 separates that into 2 types (Stream and Pull), that wrap the same Algebra (so there' s only one interpreter for both Stream and Pull) I was confused as to why we needed both at first but this is actually one of my favourite design decision in the whole of fs2

Calvin Lee Fernandes @calvinlfer Aug 16 2018 23:42 so in the case of pagination, would you use that R type? to return the next page to the next Pull and then emit?

so you can do it like that however, Stream exposes higher-level combinators over Pull for example, unfoldEval is perfect for pagination and I recommend you use Stream as your first choice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment