Moving from imperative programming to functional programming requires a fairly large shift in mindset. Moving away from your mutable variables and control flow loops to a streams-based system of handling side effects requires a similar shift in mindset.
A Task
encapsulates side effects. You can do just about anything you want in a Task
. A Process
as a stream of a specific piece of data. Think of it as a list of that data. For example, if you have a counter in your application, a single Process
would be used to represent that Int
. Think of a Process
as a function that holds state that waits to emit a value when it's asked to. A Process
is a state machine that can be in one of three states: emitting values, awaiting for the result of some request, or halted. It represents all values that you expect to get from your function. Those values could be simple, like a list of Int
s, or it could be far more complex, like asynchronously waiting for responses from a remote server.
The power you gain is in isolating side effects.
There are likely better solutions to some of these. Do let me know.
- Print the combined values of two
Process
es
import scalaz.Task
import scalaz.stream.Process
object Examples {
// A counter that starts at 0 and ends at numEpisodes, exclusive.
def boundedCounter(numEpisodes: Int): stream.Process[Nothing, Int] = stream.Process.range(0, numEpisodes)
// A counter that starts at n and increments by 1 every time it produces a value.
def unboundedCounter(n: Int): stream.Process[Nothing, Int] = stream.Process(n) ++ unboundedCounter(n + 1)
def logTask(string: String): Task[Unit] = Task { println(string) }
val combinedProcess = episodeID(2) zip unboundedCounter(0)
val testProcess = combinedProcess flatMap { nums =>
stream.Process eval logTask(s"${nums}")
}
testProcess.run.run
}
This prints (0, 0)
, (1, 1)
. Although the unboundedCounter
could have continued producing values, the boundedCounter
was depleted so the combined process halted.
I recommend against using print
or println
. Instead, use scalalogging. I'm leaving other libraries out of these examples, though.
A Sink
is for applying a side effect to the result of a Process
. A simple example is printing the output of a Process
.
- Use a
Task
to encapsulate a side effect. - Use
Task.async
to encapsulate a method that has a callback - Use a
Process
to represent a stream of your data that you wish to operate on. Remeber that Scala has a built-inProcess
class that's used for executing subprocesses. If you need to use both in the same file, you'll need to reference one of them by its package:import scalaz.stream
and thenstream.Process
. - Use
++
to chainProcess
es to execute more than one. Be careful of your synatx when you do this. As an example, useProcess.eval
instead ofProcess eval
to make sure the++
operator has the right operands. - Use a
Sink
to represent a function that receives and operates on the output of aProcess
. - Use a
Queue
to asynchronously receive and store output from callback-based APIs or to create a loop where the end of aProcess
leads to its own beginning. - Use a
topic
like aQueue
that has multiple subscribes receiving the data - Use a
wye
to nondeterministically merge multipleProcess
es. - Use a
tee
to deterministically merge multipleProcess
es. - Use
flatMap
to operate on the output of aProcess
, potentially using it as an input to anotherProcess
.
- Example of restarting an unbounded counter at the tick of a bounded counter
- Exception handling, runAsync, onComplete
wye.mergeHaltBoth
wye.interrupt
- Discussion of thread pools and setting the
Strategy
onwye
. - Use
.attemptRun
instead of.run
- Ask for guidance on the strange behavioral difference between
val s = Task async { 1 }
s.run.runAsync{}
which will execute over and over and (signal wye s)(wye.interrupt)
, which will execute only once.