Scala has a very unique streaming library called FS2 (Functional Streams for Scala). This library embodies all the advantages of functional programming(FP). By understanding its design goals you will get exposure to the core ideas that makes FP so appealing.
FS2 has one central type:
Stream[Effect,Output]
You might get from this type that it's a Stream
and that it emits values of type Output
.
The obvious question here is what is Effect
? and what advantages does FS2 has over all other streaming libraries?
I will start by reviewing what problems FS2 is solving and then compare List
and Stream
with several code examples.
After that I will focus on how to use Stream
with a DB or any other IO, that is where FS2 shines and where the Effect
type is used.
Once you will understand what Effect
is, the advantages of Functional Programming should be evident to you.
At the end of the posts you will get the answers to the following questions:
- What problems can I solve with FS2?
- What can I do with
Stream
thatList
cannot ? - How can I feed data from an API/File/DB to
Stream
? - What is this
Effect
type and how does it relate to functional programming ?
Note: The code is in Scala and should be understandable even without prior knowledge of the syntax.
-
Streaming I/O: Loading incrementally big data sets that would not fit in memory and operating on them without blowing your heap.
-
Control Flow (not covered): Moving data from one/several DBs/files/APIs to others in a nice declarative way.
-
Concurrency (not covered): Run different streams in parallel and make them communicate together, for example: loading data from multiple files and processing them concurrently as opposed to sequentially.You can do some advanced stuff here, as streams can communicate together during the processing stage and not only at the end.
List
is the most well known and used data structure, to get a feel of its differences with FS2 Stream
we will go through a few use cases and see how Stream
can solve problems that List
cannot.
Let's say you have a very big file (40GB) fahrenheit.txt
with a temperature on each line and you want to convert it to celsius.txt
.
import scala.io.Source
val list = Source.fromFile("testdata/fahrenheit.txt").getLines.toList
java.lang.OutOfMemoryError: Java heap space
java.util.Arrays.copyOfRange(Arrays.java:3664)
java.lang.String.<init>(String.java:207)
java.io.BufferedReader.readLine(BufferedReader.java:356)
java.io.BufferedReader.readLine(BufferedReader.java:389)
List
fails miserably because of course the file is too big to fit in memory, if you are curious you can go check later the full solution using Stream
here, but do that later, read on.
Let's say I succeeded in reading my file and I want to write it back, I would like to preserve the line structure so I need to insert a newline character \n
after each temperature.
I can use the intersperse
combinator to do that
import fs2._
Stream(1,2,3,4).intersperse("\n").toList
Another nice one is zipWithNext
scala> Stream(1,2,3,4).zipWithNext.toList
res1: List[(Int, Option[Int])] = List((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,None))
It bundles consecutive things together, very useful if you want to remove consecutive duplicates
These are only a few from a lot of very useful ones, here is the full list
Obviously Stream
can do a lot of things that List
cannot, but the best feature is coming in the next section, it's all about how to use Stream
in the real world with DBs/Files/APIs ...
Let's just say for now that this our program
scala> Stream(1,2,3)
res2: fs2.Stream[fs2.Pure,Int] = Stream(..)
What does this Pure
mean? here is the scaladoc from the source code
/**
* Indicates that a stream evaluates no effects.
*
* A `Stream[Pure,O]` can be safely converted to a `Stream[F,O]` for all `F`.
*/
type Pure[A] <: Nothing
It means no effects, ok ..., but What is an effect?, and more specifically what is the effect of our program Stream(1,2,3)
?
This program has literally no effect on the world. Its only effect will be to make your CPU work and consumes some power!!! It does not affect the world around you.
By affecting the world I mean it consumes any meaningful resource like: a file, a database, ... or it produces anything like a file, uploading some data somewhere, writing to your terminal, ...
Let's say I want to load user ids from a DB, I am given this function, it does a calls the DB and return the userId as a Long
scala> import scala.concurrent.Future
import scala.concurrent.Future
scala> def loadUserIdByName(userName: String): Future[Long] = ???
loadUserIdByName: (userName: String)scala.concurrent.Future[Long]
It returns a Future
means in the next section but for now assumes it just wraps the value returned by the DB.
I have this Pure
stream
scala> val names = Stream("bob", "alice", "joe")
names: fs2.Stream[fs2.Pure,String] = Stream(..)
How do I get a Stream
of ids?
the naive approach would be to use the map
function, it should run the function for each value in the Stream
scala> names.map(loadUserIdByName)
res3: fs2.Stream[fs2.Pure,scala.concurrent.Future[Long]] = Stream(..)
I still got back a Pure
!!!
I gave the Stream
a function that affects the world and I still got a Pure
, not cool ...
It would have been neat if FS2 would have detected automatically that the loadUserIdByName
function has an effect on the world and returned me something that is NOT Pure
but it does not work like that, you have to use a special combinator instead of map
you have to use evalMap
scala> val userIdsFromDB = names.evalMap(loadUserIdByName)
userIdsFromDB: fs2.Stream[scala.concurrent.Future,Long] = Stream(..)
No more Pure
! we got Future
instead, yay ! What just happened?
It took:
loadUserIdByName: Future[Long]
Stream[Pure, String]
And switched the types of the stream to
Stream[Future, Long]
It separated the Future
and isolated it! the left side that was the Effect
type parameter is now the concrete Future
type
Neat trick, but how does it help me?
You just witnessed true separation of concerns, you can continue to operate on the stream with all the nice List
like combinators and you don't have to worry about if the DB is down, if it's slow or all the stuff that is related to the network(effect) concerns.
It all works until, I want to use toList
to get the values back
scala> userIdsFromDB.toList
<console>:18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long]
userIdsFromDB.toList
^
What ???!!! I could swear that I used toList
before and it worked, how can it says that toList
is not a member of fs2.Stream[Future,String]
anymore
?
It is as if this function was removed the moment I started using an effectfull stream, that's impressive ! But how do I get my values back ?
scala> userIdsFromDB.compile
res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = fs2.Stream$ToEffect@fc0f18da
First we use compile
to tell the Stream
to combine all the effects into one , so we get a sort of big Future
Now toList
should work
scala> userIdsFromDB.compile.toList
<console>:18: error: could not find implicit value for parameter F: cats.effect.Sync[scala.concurrent.Future]
userIdsFromDB.compile.toList
^
What ???!!! It still does not compile, that's because Future
is not a good Effect
type, it breaks the philosophy of separation of concerns as explained in the next very important section.
##[IMPORTANT] The ONE thing to take away from this post
A key point here, is that the DB has not been called at this point, nothing happened really, the full program does not produce anything.
def loadUserIdByName(userName: String): Future[Long] = ???
Stream("bob", "alice", "joe").evalMap(loadUserIdByName).compile
Yes it might be surprising but the major theme in FP is separating the
- Description of you program: a good example is the program we just wrote, it's a pure description of the problem "I give you names and a DB, give me back IDs"
And the
- Execution of your program: running the actual code and asking it to go to the DB
One more time our program has literally no effect on the world besides making your computer warm, exactly like our Pure
stream.
Code that does not have an effect is called pure and that's what all Functional Programming is about writing programs with functions that are pure, bravo you now know what FP is all about.
Why would you want write code this way ? Simple: to achieve separation of concerns between the IO parts and the rest of our code.
Now let's fix our program and take care of this Future
problem.
As we said Future
is a bad Effect
type, it goes against the separation of concerns principal, indeed Future
is eager in Scala, the moment you create one it starts to executes on some thread, you don't have control of the execution and thus it breaks. FS2 is well aware of that and does not let you compile, to fix this we have to us a type called IO
that wraps our bad Future
That bring us to the last part , what is this IO
type? and how do I finally get my list of usedIds back?
scala> import cats.effect.IO
import cats.effect.IO
scala> Stream("bob", "alice", "joe").evalMap(name => IO.fromFuture(IO(loadUserIdByName(name)))).compile.toList
res8: cats.effect.IO[List[Long]] = IO$2104439279
It now gives us back a List
but still we didn't get our IDs back, so one last thing must be missing.
IO
come from cats-effect library,
First let's finish our program and finally get out ids back from the DB
scala> userIds.compile.toList.unsafeRunSync
<console>:18: error: not found: value userIds
userIds.compile.toList.unsafeRunSync
^
The proof that it's doing something is the fact that it's failing.
loadUserIdByName(userName: String): Future[Long] = ???
When ???
is called you will get this exception, it means the function was executed (as opposed to before when we made the point that nothing was really happening) and when we implement this function it will go to the DB and load the ids, it will have an effect on the world (network/files system).
IO[Long]
is a description of How to get a value of type Long
and it most certainly involve doing some I/O i.e going to the network, loading a file , ...
It's the How and not the What
IO
does not wrap anything! it describes how to get the value from the network and if you want to execute this description you can use unsafeRunSync
or other function prefixed unsafe
, you can guess why they are called this way as at this point the call the DB and it could fail if for example your Internet connection is out, ...
- Concern 1: Description =>
IO[A]
- Concern 2: Execution =>
io.unsafeRunSync
Stream
is a super charged version ofList
-Stream
allows to model infinite stream thus it does not have a size
method
-
Stream
allows to work on data that would not fit in memory like reading from a large file and writing back to another file -
Stream(1,2,3)
is of typeStream[Pure, Int]
, the second typeInt
is the type of all values that this stream will generate -
Pure
means no effect on the world. it just make your CPU work and consumes some power but besides that it does not affect the world around you. -
Use
evalMap
instead ofmap
when you want to apply a function that has an effect likeloadUserIdByName
to aStream
-
Stream[IO, Long]
separates the concerns of What and How by letting you work only with the values and not worrying about how to get them (loading from the db). -
Separating program description from evaluation is a key aspect of FP
-
All the programs you write with
Stream
will do nothing until you useunsafeRunSync
, before that your code is effectively pure -
IO[Long]
is an effect type that tells you: you will getLong
values from IO (could be a file, the network, the console ...), it's a description ! and not a wrapper -
Future
does not abide to this philosophy and thus is not compatible with FS2, you have to useIO
type instead
-
Hands on screencast by Michael pilquist: https://www.youtube.com/watch?v=B1wb4fIdtn4
-
Talk by Fabio Labella https://www.youtube.com/watch?v=x3GLwl1FxcA