Skip to content

Instantly share code, notes, and snippets.

@ssanj
Last active March 26, 2020 12:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ssanj/9e9d84732ccdc9be4892257c5e70121a to your computer and use it in GitHub Desktop.
Save ssanj/9e9d84732ccdc9be4892257c5e70121a to your computer and use it in GitHub Desktop.
import fs2.Stream
import fs2.Pipe
import fs2.Chunk
import fs2.Pull
import cats.effect.IO
import scala.language.higherKinds
//implementing scan from https://fs2.io/guide.html#exercises-stream-transforming
//Stream.range(1,10).scan(0)(_ + _).toList // running sum
// res38: List[Int] = List(0, 1, 3, 6, 10, 15, 21, 28, 36, 45)
object Scan {
def scan[F[_], O, A](s: Stream[F, O])(init: A, f: (A, O) => A): Stream[F, A] = {
def go(s1: Stream[F, O], acc: A): Pull[F, A, Unit] = {
s1.pull.uncons.flatMap {
case Some((chunk, next)) =>
if (chunk.isEmpty) go(next, acc)
else {
val first = chunk.apply(0) //safe because chunk is not empty
val remaining = Stream.chunk(chunk.drop(1)) ++ next
val result = f(acc, first)
Pull.output(Chunk(acc)) >> go(remaining, result)
}
case None => Pull.output(Chunk(acc)) >> Pull.done
}
}
go(s, init).stream
}
}
@ssanj
Copy link
Author

ssanj commented Mar 25, 2020

usage:

import fs2.Stream
import fs2.Pipe
import fs2.Chunk
import fs2.Pull
import cats.effect.IO
import scala.language.higherKinds
import net.ssanj.fs2exercises.Scan._

val s1 = Stream.range(1,10)
scan[fs2.Pure, Int, Int](s1)(0, _ + _).toList

@ssanj
Copy link
Author

ssanj commented Mar 25, 2020

How can we make this implementation better?

@ssanj
Copy link
Author

ssanj commented Mar 26, 2020

using uncons1 as recommended by @ajfitzpatrick:

  def scan2[F[_], O, A](s: Stream[F, O])(init: A, f: (A, O) => A): Stream[F, A] = {
    def go(s1: Stream[F, O], acc: A): Pull[F, A, Unit] = {
      s1.pull.uncons1.flatMap {
        case Some((element, next)) =>
          val result = f(acc, element)
          Pull.output(Chunk(acc)) >> go(next, result)
        case None => Pull.output(Chunk(acc)) >> Pull.done
      }
    }

    go(s, init).stream
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment