Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Last active April 19, 2024 14:18
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calvinlfer/4a22e7a33e66d95afee5937d982b8ded to your computer and use it in GitHub Desktop.
Save calvinlfer/4a22e7a33e66d95afee5937d982b8ded to your computer and use it in GitHub Desktop.
Developing an understanding of the FS2 Pull datatype

Developing an understanding of the FS2 Pull data type

In Pull[F, O, R], R is the return type. Pull represents a computation that emits some values on the Stream (of type O) and returns a new thing (R). In order to convert a Pull to a Stream, R must be Unit. This is because an FS2 Stream does not have the ability to terminate with a return value.

image

See here for the conversation

  • Stream[F, O] is monadic over O which are the output values emitted
  • Pull[F, O, R] is monadic over R which is used for stateful transformations

If you take a closer look at what we have been doing when writing our own version of myTake, we have been using that R value and emitting elements on the stream (O value)

Let's take a closer look at our myTake combinator

def myTake[F[_], I](nrOfElements: Long): Pipe[F, I, I] = { inputStream: Stream[F, I] =>

  def go(stream: Stream[F, I], remaining: Long): Pull[F, I, Unit] =
    if (remaining <= 0) Pull.done
    else stream.pull.uncons1.flatMap {
      case None => Pull.done
      case Some((element, remStream)) => Pull.output1(element) >> go(remStream, remaining - 1)
    }

  go(inputStream, nrOfElements).stream
}

We use the monadic Pull[F, O, R] API which is monadic over R, this looks a bit deceiving here (with go) because it appears that we aren't using that R type because it's returning Unit which is not the entire picture.

Let's take a closer look at uncons1

image

This is interesting, this is a Pull[F, Nothing, R=Option[(O, Stream[F, O])]], notice, everything is in the return type (R). When we map or flatMap (which is what we have been doing), we get access to that R value. We use Pull.output1 to emit O values on the Stream AND we also need to transfer some state (in our case, the remaining stream and the number of elements left to pull). We do that state transfer using the R type. The Pull datatype is monadic on that R type so we use it to transfer state between Pulls. So the pattern here is to pull the stream, use unconsX to get the remaining Stream data (and any other state you need) in the R type, do some stateful operations and decide whether to emit the data as O values using Pull.outputX and then use flatMap (or >> which is also flatMap but does not care about the argument so flatMap(_ => …) is just >>(...)) to carry any updated state through to the next Pull. This process happens recursively. We call Pull.done when we have no more work left to do which sets the R value to Unit indicating that there is no result type anymore and we have the ability to convert Pull[F, O, R=Unit] into a FS2 Stream of type Stream[F, O] which will contain all the O elements we have emitted during our Pull transformations.

image

Note: the Pull.output1 combinator is used to emit elements of type O on the FS2 Stream.

image

Note: the Pull.done combinator is to signal that that there is nothing more to pull from the upstream and inadvertently there is nothing more to emit on the FS2 Stream (since type O = Nothing) and our Pull has nothing to return (R = Unit) which also indicates that there is really no point of doing any further maps or flatMaps (so no more monadic transformations to be done on the Pull)

def myTake[F[_], I](nrOfElements: Long): Pipe[F, I, I] = { inputStream: Stream[F, I] =>
def go(stream: Stream[F, I], remaining: Long): Pull[F, I, Unit] =
if (remaining <= 0) Pull.done
else stream.pull.uncons1.flatMap {
case None => Pull.done
case Some((element, remStream)) => Pull.output1(element) >> go(remStream, remaining - 1)
}
go(inputStream, nrOfElements).stream
}
@calvinlfer
Copy link
Author

calvinlfer commented Aug 16, 2018

Example usage:

Stream
  .emits(Seq(1, 2, 3))
  .repeat
  .through(myTake(10))
  .toList
// res0: List[Int] = List(1, 2, 3, 1, 2, 3, 1, 2, 3, 1)

@calvinlfer
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment