Skip to content

Instantly share code, notes, and snippets.

@xanderdunn
Last active October 23, 2015 02:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xanderdunn/dc3b63ec3fd999f72182 to your computer and use it in GitHub Desktop.
Save xanderdunn/dc3b63ec3fd999f72182 to your computer and use it in GitHub Desktop.
Scalaz-Stream By Example

Mindset

Moving from imperative programming to functional programming requires a fairly large shift in mindset. Moving away from your mutable variables and control flow loops to a streams-based system of handling side effects requires a similar shift in mindset.

A Task encapsulates side effects. You can do just about anything you want in a Task. A Process as a stream of a specific piece of data. Think of it as a list of that data. For example, if you have a counter in your application, a single Process would be used to represent that Int. Think of a Process as a function that holds state that waits to emit a value when it's asked to. A Process is a state machine that can be in one of three states: emitting values, awaiting for the result of some request, or halted. It represents all values that you expect to get from your function. Those values could be simple, like a list of Ints, or it could be far more complex, like asynchronously waiting for responses from a remote server.

The power you gain is in isolating side effects.

Examples

There are likely better solutions to some of these. Do let me know.

  1. Print the combined values of two Processes
import scalaz.Task
import scalaz.stream.Process

object Examples {
  // A counter that starts at 0 and ends at numEpisodes, exclusive.
  def boundedCounter(numEpisodes: Int): stream.Process[Nothing, Int] = stream.Process.range(0, numEpisodes)
  // A counter that starts at n and increments by 1 every time it produces a value.
  def unboundedCounter(n: Int): stream.Process[Nothing, Int] = stream.Process(n) ++ unboundedCounter(n + 1)
  def logTask(string: String): Task[Unit] = Task { println(string) }
  
  val combinedProcess = episodeID(2) zip unboundedCounter(0)
  val testProcess = combinedProcess flatMap { nums =>
    stream.Process eval logTask(s"${nums}")
  }
  testProcess.run.run
}

This prints (0, 0), (1, 1). Although the unboundedCounter could have continued producing values, the boundedCounter was depleted so the combined process halted.
I recommend against using print or println. Instead, use scalalogging. I'm leaving other libraries out of these examples, though.

A Sink is for applying a side effect to the result of a Process. A simple example is printing the output of a Process.

Overview of Common Tools

  • Use a Task to encapsulate a side effect.
  • Use Task.async to encapsulate a method that has a callback
  • Use a Process to represent a stream of your data that you wish to operate on. Remeber that Scala has a built-in Process class that's used for executing subprocesses. If you need to use both in the same file, you'll need to reference one of them by its package: import scalaz.stream and then stream.Process.
  • Use ++ to chain Processes to execute more than one. Be careful of your synatx when you do this. As an example, use Process.eval instead of Process eval to make sure the ++ operator has the right operands.
  • Use a Sink to represent a function that receives and operates on the output of a Process.
  • Use a Queue to asynchronously receive and store output from callback-based APIs or to create a loop where the end of a Process leads to its own beginning.
  • Use a topic like a Queue that has multiple subscribes receiving the data
  • Use a wye to nondeterministically merge multiple Processes.
  • Use a tee to deterministically merge multiple Processes.
  • Use flatMap to operate on the output of a Process, potentially using it as an input to another Process.

To Do

  • Example of restarting an unbounded counter at the tick of a bounded counter
  • Exception handling, runAsync, onComplete
  • wye.mergeHaltBoth
  • wye.interrupt
  • Discussion of thread pools and setting the Strategy on wye.
  • Use .attemptRun instead of .run
  • Ask for guidance on the strange behavioral difference between
val s = Task async { 1 }
s.run.runAsync{}

which will execute over and over and (signal wye s)(wye.interrupt), which will execute only once.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment