Skip to content

Instantly share code, notes, and snippets.

@dsebban
Last active April 11, 2024 20:48
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save dsebban/dbe7c8df5e31dead36e0dda1f0108714 to your computer and use it in GitHub Desktop.
Save dsebban/dbe7c8df5e31dead36e0dda1f0108714 to your computer and use it in GitHub Desktop.
Understanding fs2 `Pull`

Undertsanding the Pull type from fs2

From the scaladocs

class Stream[+F[_], +O] extends AnyVal
  • A stream producing output of type O
  • May evaluate F effects.
class Pull[+F[_], +O, +R] extends AnyVal
  • p: Pull[F,O,R] reads values from one or more streams
  • Returns a result of type R,
  • Produces a Stream[F,O] when calling p.stream.

Questions

What does Pull mean in regard to Stream ?

What is O ?

What is R ?

In order to understand these types we will compare them to the types described the Red Book Chapter 15 : (https://github.com/fpinscala/fpinscala/blob/master/exercises/src/main/scala/fpinscala/streamingio/StreamingIO.scala)

case class Await[I,O](
        recv: Option[I] => Process[I,O])
      extends Process[I,O]

This is the first simple version (no effect involved, i.e F[_])

Process[I,O] can be thaught has a Stream[I] => Stream[O]

From the book: Await(recv) requests a value from the input stream. The driver should pass the next available value to the recv function, or None if the input has no more elements.

It's basically Pulling on the stream and requesting an element I and then it has to produce a Stream[I] => Stream[O] using this I

Later on, Await is generalized an you get :

 trait Process[F[_],O] {
    import Process._
 }   
  object Process {
    case class Await[F[_],A,O](
      req: F[A],
      recv: Either[Throwable,A] => Process[F,O]) extends Process[F,O]
    )
 }

Definition : Process[F,O] represents a stream of O values (O for output) produced by (possibly) making external requests using the protocol F via Await.

Let's get rid of the Either and get back to a simpler

  case class Await[F[_],A,O](
      req: F[A],
      recv: Option[A] => Process[F,O]) extends Process[F,O]
    )
    
  case class Emit[F[_],O](
      head: O,
      tail: Process[F,O]) extends Process[F,O]

From the FS2 guide[https://functional-streams-for-scala.github.io/fs2/guide.html]

A Stream[F,O] (formerly Process) represents a :

  • Discrete stream of O values
  • Which may request evaluation of F effects.

Let's complete the sentence using the definition of Await:

Which may request evaluation of F effects via Pull

Let's rename the types to the one we like

  case class Pull[F[_],A,O](
      req: F[A],
      recv: Option[A] => Stream[F,O]) extends Stream[F,O]
    )
    
  case class Emit[F[_],O](
      head: O,
      tail: Stream[F,O]) extends Stream[F,O]

to request element in fs2 the uncons1 function is used

def uncons1: Pull[F, Nothing, Option[(O, Stream[F, O])]]

Waits for a single element to be available in the source stream. The element along with a new stream are provided as the resource of the returned pull. The new stream can be used for subsequent operations, like awaiting again. A None is returned as the resource of the pull upon reaching the end of the stream.

To emit an element :

def output1[F[x] >: Pure[x], O](o: O): Pull[F, O, Unit]

Ouptuts a single value.

Answers !

Pull can both request via uncons1 and output elements via output1 !

In terms of the type from the Red book it means :

Pull merges Process.Emit and Process.Await in one type.

How do you know in which state it is ?

State Await: Request an element

uncons1: Pull[F, O = Nothing, R = Option[(O, Stream[F, O])]]

  • O is Nothing because it is in requesting mode , no output allowed
  • R is the Process.Emit head and tail zipped together, so when you flatMap on to the Pull you can access the last emitted value
  • F is the effect it evaluated when you flatmapped on R

State Emit: Output an element

output1[F, O](o: O): Pull[F, O, R = Unit]

  • O is o we are emmiting the input to the stream
  • R is Unit because we cannot request(Await) in the Emit state
Process[F,O] Stream[F,O]
Await(req: F[A], recv: Option[A] => Stream[F,O]) Pull[F, O = Nothing, R = Option[(O, Stream[F, O])]]
Emit(head: O, tail: Stream[F,O]) Pull[F, O, R = Unit]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment