Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Introduction to scalaz-stream

Introduction to scalaz-stream

Every application ever written can be viewed as some sort of transformation on data. Data can come from different sources, such as a network or a file or user input or the Large Hadron Collider. It can come from many sources all at once to be merged and aggregated in interesting ways, and it can be produced into many different output sinks, such as a network or files or graphical user interfaces. You might produce your output all at once, as a big data dump at the end of the world (right before your program shuts down), or you might produce it more incrementally. Every application fits into this model.

The scalaz-stream project is an attempt to make it easy to construct, test and scale programs that fit within this model (which is to say, everything). It does this by providing an abstraction around a "stream" of data, which is really just this notion of some number of data being sequentially pulled out of some unspecified data source. On top of this abstraction, scalaz-stream provides a large number of tools for manipulating, slicing, transforming, merging and outputting (totally a word). These tools, generally referred to as combinators, are clean and orthogonal, serving a minimal and targeted purpose with easy-to-understand rules and guarantees.

Among these combinators are tools for merging streams in interesting ways, often involving concurrency. Scalaz-stream makes it very easy to model data transformations that are run in parallel, taking full advantage of multi-core and (we hope soon!) multi-server distributed evaluation. However, scalaz-stream makes no attempt to "guess" about what problems make sense to run concurrently or what sort of granularity seems best. If you want concurrency, scalaz-stream makes it easy to get, but you must ask for it. This is a central tenant of the whole library design, and it results in code which is clean, well-factored and has few surprises when it comes time to evaluate.

This article will introduce the basic principles of scalaz-stream, with a particular focus on doing useful things easily and in a testable and scalable fashion. No knowledge of "advanced functional programming" (e.g. Scalaz) is assumed! You don't need to be well-versed in category theory to benefit from the power and cleanliness of scalaz-stream, and this article well help you get started.

Prerequisites

Scalaz-stream depends on the Scalaz library for technical reasons. At present, there are two outstanding and supported versions of Scalaz: 7.0.6 and 7.1.0. The latter is of course more contemporaneous and has numerous features that are absent in 7.0.6, but also some incompatibilities which forces certain projects to use the older version. If you have one of those projects, and you're stuck on Scalaz 7.0.6 for other reasons, you need to use the 0.7 version of scalaz-stream. Otherwise, if you don't already have a dependency (direct or indirect) on Scalaz or you're free to upgrade, use 0.7a. Thus, your SBT configuration will look like the following:

resolvers += "Scalaz Bintray Repo" at "http://dl.bintray.com/scalaz/releases"

libraryDependencies += "org.scalaz.stream" %% "scalaz-stream" % "0.7a"

All of the scalaz-stream classes and modules are contained within the scalaz.stream package. By far, the most useful type in this package is Process, which represents a sequential stream of…things. We'll get more into what this means in a moment, but for now it is sufficient just to get the dependencies setup.

Transforming and Running Processes

The scalaz.stream.Process type represents a possibly infinite sequential stream of stuff. What that stuff is depends on the two type parameters that you give to Process. For example:

def foo(p: Process[Nothing, String]) = ???

In the above, it looks like p is a stream of String values. I know from the first parameter, Nothing, that there are no effects within the stream. Controlling the existence (or non-existence) of effects is a central design philosophy of scalaz-stream, and it's a significant part of what makes it such a powerful and useful tool. For now, think of an effect as something like "reading from a file", "receiving data from an AJAX request", or really anything that involves dealing with "the outside world".

def foo(p: Process[Task, Request]) = ???

Ah, now this is something more interesting! The Nothing has been replaced with Task, which means that somewhere inside of p, we're talking to the outside world. Or rather, we will talk to the outside world when we actually go to evaluate p. More on this in a bit!

You'll also notice that the second type parameter is now Request. Just guessing, but it looks a bit like the above is a stream of external requests coming into some sort of server (us), possibly over HTTP or some other protocol. I would guess that the above stream is unbounded, since we are going to want to continue processing requests forever, or rather until someone asks us to stop. Unbounded, or infinite streams are tremendously useful tools. It's just so easy to say "here is a stream that represents all requests I will ever receive"! Having such a stream as a first class "thing" that we can manipulate and get our hands around is very very nice indeed.

But I'm getting ahead of myself. Streams aren't very interesting if we can't do things with them, and really that's what scalaz-stream is all about: manipulation and transformation of data (and effects). So what can we do with a Process?

Well, the answer is: nearly everything that we can do with a Seq! If you're familiar with Scala's collections library, you should feel very at home working with Process. The map, flatMap, filter, take, drop, collect, zip (and more!) functions are all available and work exactly as you would expect. As an example, let's create a simple process that has some hard-coded data and do a bit of trivial transformation:

val p = Process(5, 4, 3, 2, 1)

val results = p collect {
  case 1 => "one"
  case 2 => "two"
  case 3 => "three"
} filter { _.length > 3 } map { _.toUpperCase }

Pretty standard stuff. We're collecting on the input Int(s), filtering and then mapping. If p were a Vector or a List, we would expect results to contain something like Vector("THREE"). But while Process may look like a collection, it's not! The results variable is of type Process[Nothing, String] (no effects were harmed in the making of this stream), and it doesn't really "contain" anything. Rather, results knows how to compute the sequence of data that will result from our pipeline of operations, but it hasn't actually done the work yet. If we want to ask it to do the work, we need to put in a bit more effort.

Running Streams

results.toSource.runLog.run         // => Vector("THREE")

The toSource in the above is a bit weird, and not actually something you'll be doing very often. We need it here because scalaz-stream actually doesn't understand how to run a Process that doesn't have effects! toSource takes our effect-less stream and makes it pretend to have effects in it (even though there aren't any there) so that runLog can do its work.

Now, runLog.run is obviously a very clumsy incantation. The essence of it is that runLog instructs Process to compile its pipeline down to a single atomic operation (in this case, a Task) which when run produces a Seq[String]. It would of course be a Seq[Int] if our input process (results) contained Int data, or Seq[Request] if it contained requests. As the second sentence of this paragraph implies though, runLog alone does not actually run the Process (this distinction can get even weirder if we use the run interpreter rather than runLog, since neither actually run the process). Instead, it produces a single atomic operation, which in our case is a Task (it is determined by the first type parameter given to Process) which can be run to produce the final output.

There are some very good reasons for this convoluted dance, but they get into some more complex aspects of the framework. The good news is that, in practice, you really don't spend a lot of time running processes! After all, why would you run a process to completion when you can just compose it with another process? In practice, most programs have one instance of runLog.run (or more conventionally, run.run, which discards the individual results) right at the very end of the program. If you have more than one spot in your code (outside of tests, of course) where you say p.run.run on some process p, you're probably doing something wrong and you should look for a built-in combinator that allows you to compose your processes more effectively.

Composition and Infinities

Coming back to process composition, we also have the ability to merge streams together using standard combinators like zip. For example:

val names = Process("one", "two", "three")
val nums = Process(1, 2, 3)

names zip nums          // => Process(("one", 1), ("two", 2), ("three", 3))

If you recall from earlier, Process represents a possibly infinite stream of data. We can exploit this functionality, just as with scala.Stream (which is otherwise very different from Process!) to do the above in a slightly "cooler" way:

val names = Process("one", "two", "three")
def nums(n: Int): Process[Nothing, Int] = Process(n) ++ nums(n + 1)

names zip nums(1)          // => Process(("one", 1), ("two", 2), ("three", 3))

Obviously far less concise than the first version, but it just feels more awesome. The important point is that we're getting comfortable working with infinite streams of data. As I alluded to earlier with the "infinite stream of requests" example, infinite streams are enormously convenient as a tool. Just at a conceptual level, if you model your entire program as a composition of streams, and your program is some sort of server that is meant to run until killed, then you must have some sort of infinite stream (or streams plural!) inside of your program. This is the rule, not the exception to how scalaz-stream is used.

Now of course, given that scalaz-stream allows the representation of infinite streams, some familiar collections operations just don't make sense:

def nums(n: Int): Process[Nothing, Int] = Process(n) ++ nums(n + 1)

nums(0).length          // doesn't compile!

You can't ask for the length of a Process. It's possible to implement the length function, but if you tried to use it on an infinite process, it would just hang forever. Note that it also isn't possible to "index" into a Process, to for example ask for the fourth element. With scalaz-stream, the stream is what is interesting, not really any one particular piece of data within that stream. So if you find yourself hunting around for tools to treat Process as if it were a Vector or even a List, you should probably adjust the way you're using the library, because you'll get much more out of it by embracing the stream paradigm!

Finding More

There are a ton of very very useful and yet simple transformations on Process that are very "collection-like". Most of the single-Process transformations (like collect) are implemented through a powerful abstraction called Process1, and then implicitly added to the Process type. You can find all of the built-in single-Process transformations in the scalaz.stream.process1 object. Similarly, nearly all of the two-Process transformations (like zip) are implemented through a powerful abstraction called Tee, and similarly implicitly exposed. You can find all of the built-in two-Process transformations in the scalaz.stream.tee object. So if you're not sure whether or not a particular function exists on Process, you can go hunting through those modules to perhaps find what you're looking for.

Performing Actions

As I've been insisting all along, Process represents a (possibly infinite) sequence of stuff, where the stuff could be data and it could be effects. An effect is talking to the outside world in some way, or receiving data from the outside world, or otherwise doing useful things. Effects are encapsulated inside of the functor in which the Process is evaluating. This is a ton of terminology all at once, but the short way of looking at it is that your effects are all managed by the first of the two type parameters on Process. For example:

  • Process[Nothing, Int] - Cannot contain any effects at all!
  • Process[Task, Int] - Probably talks to the outside world in very complicated ways, and perhaps involves concurrency
  • Process[Burrito, Int] - Contains effects, but apparently only contains effects related to consumption of Chipotle

Just as a function of convenience, most of the time you're going to be dealing with process types of the form Process[Task, _] (fill in the _ with your data type). Task is the most general effect possible, and it's also the only effect which allows concurrent composition of streams. With that said, it is tremendously useful to be able to have more restricted effects, like Burrito in the above. After all, if the only effects your stream can perform is to eat a delicious meal, then your stream becomes that much easier to test! You don't have to worry about mocking a database or stubbing out a network client if the stream you're trying to test cannot possibly talk to a database or a network. Task effects can talk to databases and networks. Burrito effects cannot, and this is a secret magic protip for terrifyingly easy testing.

But back to actions… Our program needs to, at some level, talk to a database or read from a network channel or launch the missiles, and this is where Task comes in. For example, here are a pair of actions which read from and write to stdout, respectively:

def puts(ln: String): Task[Unit] = Task { println(ln) }
val gets: Task[String] = Task { Console.readLine() }

If you're reading someone else's code, you might also see actions constructed using Task.delay. Don't worry too much about the difference between Task.delay and Task.apply (what we're using above). If you're using scalaz-stream, the differences between these two constructors are mostly meaningless, so you may as well use the shorter one!

So how do we put actions into a stream? Very simply, as it turns out:

val p = Process eval puts("Hello, World!")          // => Process(())

Now, p in the above is a Process[Task, Unit], so it doesn't really produce any useful values. What it does do though is perform an action, which is to say, run println! If we were to interpret p by calling p.run.run, we would get the classic results printed to our console.

Processes that contain effects can be composed using the exact same tools that we've been using to play around with processes that do not contain effects. And this is the power of scalaz-stream! For example, we could write an echo terminal in the following way:

val lines = Process repeatEval gets

val p = lines flatMap { line =>
  Process eval puts(line)
}

We haven't seen repeatEval yet, but it's very much like eval except that it keeps evalling forever. So, lines is a Process[Task, String] that represents every line the user will ever enter via stdin. We then flatMap into this infinite stream of lines, and for each one, we eval the puts action on that line. Actually, this pattern of having some effectful "destination" for data is so common that scalaz-stream uses higher-order design to provide a first-class manifestation in the form of Sink.

Sink[Task, A] is simply equivalent to Process[Task, A => Task[Unit]], which is to say a stream of functions, where each function performs an action. It's a very simple idea, but it's powerful enough to represent everything that we would want from terminal points for our data processing.

val lines = Process repeatEval gets
val stdout = Process constant (puts _) toSource

val p = lines to stdout

This snippet does exactly the same thing as the previous example with flatMap. The only difference is that we're lifting puts into a sink (stdout) of type Sink[Task, String] and using the to combinator to connect lines into this sink. The end effect is the same, just substantially more concise.

Incidentally, the constant and toSource bits aren't magic or in any way specific to Sink (in fact, we saw toSource before). The constant constructor creates a Process[Nothing, A] given a value of type A (in this case, puts _), where the process is an infinite stream of that one value, over and over and over. In this case, that is precisely what we want for a stdout sink: the way in which you print to stdout doesn't vary over time. However, some sinks, for example a network load balancer abstracting over several destinations, might want to have a different "write" function as the stream goes along. Since Sink is just a Process, we clearly have more than enough power to support this kind of thing, but we don't need it here.

And as before, toSource is just a bit of faerie dust that converts our Process[Nothing, A => Task[Unit]] into a Process[Task, A => Task[Unit]] by pretending there are effects of type Task. Remember, the value puts _ is not an effect, it's just a value! It may produce an effect (of type Task[Unit]), but it is not in and of itself an effect. This is why we can lift it into an infinite stream using constant rather than repeatEval.

The to combinator is very similar to what we wrote earlier with flatMap. In fact, it's almost precisely the same thing! It repeatedly takes a value from the left (the source) and a function from the right (the sink) and applies the function to the value, evaling the results into the stream. Thus, p has type Process[Task, Unit]. It's an infinite stream of effects, all of which produce () as a result. Of course, () isn't a very useful value, so clearly what we're interested in here is just the effects. For that reason, if we run p, we're probably not going to use runLog (which would give us a Task[Seq[Unit]]). Instead, we'll just use run, which produces a Task[Unit], effectively discarding any data produced from the process and only keeping the effects.

p.run.run

The above will run until we hit Ctrl-C, reading lines one at a time from the user and immediately echoing them back out.

Callbacks

A very, very common situation in asynchronous code is to run into an API which is structured around a callback or listener system. For example, when you're using Java's NIO library, you accept connections on a server channel by passing the channel a listener that will be invoked once a connection is established. For example, something like this (simplified pseudocode):

val channel = ...
channel.accept(new ConnectionListener {
  def ready(connection: Connection): Unit = {
    // read data out of connection here!
  }

  def failed(t: Throwable) = {
    // maybe log?
  }
})

If scalaz-stream didn't provide a clean way of interacting with these sorts of APIs, then it wouldn't be a very useful framework for writing practical asynchronous applications! The magic to address this situation is wrapped up in the Task.async constructor.

Thus far, all of our Task actions have been constructed using Task.apply, which takes a block of code that will be run at some future time, probably on a different thread. Task.async is a little different. This constructor takes a function which in turn receives a callback. When the callback is invoked, the Task is completed. So rather than waiting for a definite block of code to run to completion, it creates a Task which waits for a listener to be notified! We can modify the above example to produce a Task[Connection] using Task.async:

val accept: Task[Connection] = Task async { cb =>
  channel.accept(new ConnectionListener {
    def ready(connection: Connection) = cb(right(connection))
    def failed(t: Throwable) = cb(left(t))
  })
}

So accept is an effect which asynchronously (and without blocking!) accepts an incoming Connection on channel. If and when a connection is received, it is used to complete the task by invoking cb with right(connection). The "right" here comes from Scalaz's Either type, which is very similar to scala.Either. If we hit an error, we complete the task with that error by invoking cb with left(t). It's as simple as that!

I can even work with streams of asynchronous effects just as easily as I can work with streams of synchronous effects!

val server: Process[Task, Connection] = Process repeatEval accept

So, server is a stream of every connection that channel will ever receive. These connections will be received asynchronously, without blocking any threads, so we're not sacrificing any throughput to have this nice abstraction.

Now you're starting to see where scalaz-stream can bring real tangible value to a real world application. Modeling a server as an infinite stream of connections makes it very very simple to write functions that work with those connections, filter out undesired ones, and so on. We don't have to worry about crazy asynchronous loops or thread forking or any of that madness. Everything just works, and it's very very simple and concise.

Resource Safety

One of the more useful features that scalaz-stream surrounding effects is the ability to guarantee resource safety, even in the face of interrupts, shutdown, errors, and the like. It wouldn't be very practically useful to provide a mechanism for opening a network channel without also providing some way of guaranteeing that channel is inevitably closed! This feature is satisfied by onComplete.

Now, onComplete is a bit of a weird beast. It's a combinator on Process that takes another Process as its parameter. The parameter (on the right hand side) is guaranteed to run exactly once when the left hand side "completes", where by "completes" I mean runs to completion, hits an error, or even is interrupted by some other concurrent process. It's a bit like the finally in a try/finally block. The trick though is that an onComplete parameter will not run if the left hand side never runs in the first place! Thus, you can construct finalizers (using onComplete) that seem like they're always going to be hit, but in fact which sometimes do not run. The scalaz-stream project has some future plans to alleviate this confusion, but for now, just try to be careful to attach your onComplete to a "top level" process in a composition (i.e. not a process within the right side of an ++ or inside of a flatMap).

Grabbing a resource and then releasing it with a finalizer is very straightforward, though we can't do it (safely) with eval. Instead, we're going to use the slightly lower-level await.

val p: Process[Task, Array[Byte]] = Process.await(connect) { channel =>
  val reads = Process repeatEval (Task async channel.accept)
  reads onComplete (Process eval_ (Task { channel.close() }))
}

There are a few new things here. First off, await is a constructor on Process that performs an effect (in this case, connect) and passes the results of that effect to the body of the function it is given. This function then uses the results of the effect to compute a new Process (in our case, reads) which gets returned from the body. Any values produced by the process within the body of the await will be the values produced by the process returned from await: namely, p in our example. Thus, await is a way of computing a stream based on the results of a single effect. The eval constructor is simply defined in terms of await:

def eval[F[_], A](effect: F[A]): Process[F, A] =
  await(effect) { a => Process(a) }

Speaking of eval, the example from earlier contains a bit which reads Process eval_. This isn't a typo! The eval constructor comes in two flavors: with and without an underscore. The only difference is that eval produces the value which was computed by its effect, while eval_ performs the effect (just like eval) and then throws away the value. This is really useful for a finalizer, since we just want to clean up some resources (which is an effect!) and we don't care about producing more values.

The really interesting bit though is onComplete. This is where the magic happens for finalization. If and only if the connect effect completes (as in, it produces a channel), the finalizer (the right side of onComplete) is guaranteed to run exactly once. This is an invariant that the library will preserve regardless of errors or interrupts. If you get a channel, then you also get the chance to dispose of it with onComplete.

Building a complex application with scalaz-stream necessitates a lot of resource allocation and deallocation. The onComplete combinator is the tool with which you tame this mess. It allows you to keep your resource management local and guaranteed, so that you never have to worry about resources leaking because of some weird concurrent exception or interrupt thing caused by another process way over on the other side of town. You can produce streams of data that require resource management to compute, and you can use those streams safely without having to worry about any of the details of when their resources are acquired and when they might (or might not be!) released.

Naturally, this provides an excellent foundation for a file read/write API. For convenience, scalaz-stream does provide several functions within the scalaz.stream.io module which satisfy this use case. For example, the following process reads all of the lines from the "foo.txt" file, computes the length of each line, and then writes those lengths out to "foo-lengths.txt".

val p: Process[Task, Unit] = io linesR "foo.txt" map { _.length } to (io linesW "foo-lengths.txt")

All of this is resource safe and incremental. In other words, we're not reading the entire "foo.txt" file into memory before computing the lengths and then writing the whole thing out again. In fact, foo.txt could be arbitrarily large, and while the program would naturally take longer to complete, it wouldn't use any more memory. Try writing that by hand!

Another example would be replicating the Unix cat utility using scalaz-stream.

def cat(file: String): Unit = {
  def puts(ln: String): Task[Unit] = Task { println(ln) }
  val stdout = Process constant (puts _) toSource

  (io linesR file to stdout).run.run
}

As a side-effect of all of this (no pun intended), scalaz-stream is probably the easiest way to read or write a file in Scala. The equivalent code to the above using scala.io.Source is dramatically more verbose and easier to get wrong, in addition to being resource unsafe. I use the above combinators (and the others like them) all the time for really simple "script-ish" things where I just need to mash some data into a file and/or slurp it back out again.

Merging and Concurrency

Outside of really straightforward tricks with file reads and the like, basically all useful applications have multiple sources of data and multiple sinks that they need to populate, depending on data and program state. Scalaz-stream would be a truly useless library if it didn't provide some way of bringing data in from multiple sources, aggregating and transforming, feeding back out to various sinks, reading in from new sources, and so on. Fortunately, scalaz-stream goes well above and beyond the minimum feature checkbox here, and its combinators for merging streams are powerful, simple, and form the backbone of its concurrency support.

This last bit is worth dwelling on for a moment. As discussed, a Process is a possibly infinite sequence of stuff, with the key word here being "sequence". When a Process containing the values a, b and c (in that order) is evaluated, we must finish computing a before we go back to start computing b, which in turn we must finish before we start computing c. Process does not pipeline operations in sequence (note that this is a significant difference from how Akka Flows work). This is very nice because it makes it very easy to reason about what effects are performed one-after-the-other, and furthermore makes it very easy to enforce a strong ordering between effects when one is needed (just put them in order in a Process!).

The problem with a strongly sequential paradigm is that concurrency must be somehow achieved in other ways, and this is where the merge combinators become significant. If you have two strongly sequential streams, neither one of them will have any concurrency in its own computation. However, you can merge them together into a single strongly sequential stream, where the exact interleaving between the two streams is nondeterministic. This is to say that two streams, Process(a, b, c) and Process(x, y, z), could be merged into a single stream Process(a, b, x, c, y, z), or perhaps Process(x, a, b, c, y, z), or Process(x, y, z, a, b, c), or others. We will never see b before we see a in the output stream, nor will we see z before we see x, but the exact interleaving between the two stream values is allowed to float, and this is where concurrency comes from.

Since we are defining the merge operation to interleave two streams without constraining mutual ordering, scalaz-stream is free to run that merge in parallel. In fact, what it does under the surface is run both streams simultaneously using a function called stepAsync (this function is technically part of the public API, but it is very low-level and unsafe, so not recommended for use). The merge operation will allow either side to "race ahead" of the other side, only constrained by scheduling fairness (i.e. it won't allow one side to hog all the threads). So in a sense, you can think of the merge operation as "run both processes as fast as they can go, producing results as they are available in the order they arrive".

If you think about it, this is precisely the semantic that we want, not only for concurrency and multi-core throughput optimization, but also for combining multiple data sources into one! As an example, let's imagine you're writing some sort of social media analytics application, where you're grabbing data from various social media platforms, searching for your company's name, performing some sort of sentiment analysis on the contents and reporting the results to a data store as well as a real-time output feed. This is actually a relatively common thing that companies actually do, by the way, and it is very representative of how scalaz-stream can be used to write highly concurrent applications.

def computeSentiment(text: String): Double = ???     // there are tools for this

val facebook: Process[Task, Post] = ???         // talk to facebook somehow and get data in real-time
val twitter: Process[Task, Tweet] = ???         // twitter streaming API ftw

val fbText = facebook map { _.text }
val twText = twitter map { _.contents }

// merge the two feeds and process them together!
val sentiment = fbText merge twText filter { _ contains "Evil Corp" } map computeSentiment

val datastore: Sink[Task, Double] = ???
val streaming: Sink[Task, Json] = ???

sentiment observe datastore map { d =>
  json"""{"score": $d}"""
} to streaming

Don't blink or you'll miss it! The merge combinator in the above is what is doing all the magic here. It functions exactly the way we have described: taking data from both streams as fast as they can produce it, merging down into a single output stream. Presumably, both the facebook and the twitter streams involve some sort of network connection to an external API. In the above formulation, using merge, neither stream will "wait" for the other one. If Twitter is giving us data faster than Facebook is, then we will run the Twitter stream faster, pushing the results into the merged output process as they arrive.

Like many of the combinators in scalaz-stream (e.g. zip or filter), merge is actually implemented through a very general abstraction called Wye (similar to Tee, except nondeterministic). There are a lot of other extremely useful combinators implemented via Wye in the scalaz.stream.wye module. In addition to wye.merge, two others that I use on a daily basis are wye.mergeHaltBoth (similar to wye.merge, except the resulting stream ends as soon as either input stream ends) and wye.interrupt (allows you to remotely "kill" a stream; very useful for testing).

As a quick sidebar, the observe combinator in the above is also new. It works almost exactly like to, except it doesn't consume the values in the stream. This is really nice, because it allows you to wire up multiple sinks to the same stream, with each one seeing the same values. It also allows you to very easily debug your processes, since it's pretty trivial to just inject a sink that prints values to stdout as they flow through the stream, but without disrupting the stream itself. For example:

val stuff: Process[Task, Value] = ???

// without debugging
stuff flatMap doThingsWithStuff filter throwStuffAway ...

// with debugging (using the stdout sink from earlier)
stuff observe stdout flatMap doThingsWithStuff filter throwStuffAway ...

Very non-invasive, and very easy. I write logging sinks all the time and just drop them in using observe.

Anyway, coming back to our analytics example... One of the things you may notice is that we're merging the Twitter and Facebook streams before we perform the sentiment analysis. This makes a lot of sense from a code structure standpoint, since our processed Twitter and Facebook streams are just producing text that we handle in a uniform way. However, even though the merge combinator is racing its two input streams against each other, the output of merge is once again strongly ordered! The merge combinator doesn't somehow produce a stream that magically runs itself in parallel, and this sets up an obvious point where we can optimize: why not run computeSentiment in parallel?

Here's the implicit observation in the above: we don't really care about the order in which people tweet or post about Evil Corp, all we care about is that they posted and what sentiment it conveyed. Presumably, any analytics that we're performing over our "social media footprint" is going to be agnostic to whether or not we reordered one person's tweet relative to another person's unrelated Facebook post that happened at almost the same time. Thus, since we don't really care about the order of our output, at least not within some sort of reasonably small time window, we can inform scalaz-stream that it's ok to relax the sequentiality just a bit, gaining concurrency (and throughput!) as a result.

To be clear, what we're trying to do here is get concurrency within a single stream. We're not talking about running two streams simultaneously; we're talking about running multiple transformations against elements of a single stream concurrently. This is directly in conflict with scalaz-stream's promise that Process is a sequence, so how do we achieve this (apparently) very desirable result?

The answer is another combinator: merge.mergeN. This combinator is kind of the golden hammer of scalaz-stream's concurrency support, and it is a powerful hammer indeed. It has the following (simplified) type signature:

def mergeN[A](ps: Process[Task, Process[Task, A]]): Process[Task, A]

So it's a bit like flattening a Seq. It takes a "stream of streams" and runs the inner streams concurrently, producing output in a single flattened stream as fast as it becomes available. Of course, there's a very real risk here that the outer stream (the stream that contains other streams) might run too fast, giving us new work to do before we're ready for it and ultimately running us out of memory. In order to avoid this case, the real mergeN takes some tuning parameters (such as maxOpen) that can be used to reign in some of the concurrency as required.

So how do we use mergeN to solve our asynchronous computeSentiment problem? After all, we don't really have a "stream of streams"; we just have a regular old stream. The answer is that we need to do a bit of juggling before we call computeSentiment in order to effectively "inform" scalaz-stream that we're trying to run things in parallel:

val aboutUs: Process[Task, String] = fbText merge twText filter { _ contains "Evil Corp" }

val ps: Process[Task, Process[Task, Double]] = aboutUs map { text =>
  Task { computeSentiment(text) }
} map { t => Process eval t }

val sentiment: Process[Task, Double] = merge.mergeN(ps)

This is obviously a lot bulkier than before, but that sort of makes sense since we're doing something very specialized with computeSentiment. Remember, a core tenant of scalaz-stream is that you must be explicit whenever you want nondeterministic behavior. It's not going to just jump off a cliff based on a guess that you might want one thing or another. You need to tell it what you want in no uncertain terms.

To unpack the above, what we're doing is lifting computeSentiment(text) into a Task. Now this might seem a bit weird, since of course Task usually represents an effect like talking to the filesystem or launching the missiles at Rival Corp. However, if you think about it, concurrency is sort of like an effect! Just like with talking to the filesystem, we want to be very clear that we really want to be asynchronous, otherwise we might accidentally read from and write to a file in the wrong order, or our test suite might need to have some sort of crazy timeout in order to properly check that we did things. Running things in parallel is an effect that needs to be controlled for the same reason as any other effect, and scalaz-stream makes this very explicit.

After lifting computeSentiment(text) into a Task, we eval that Task into a Process (which only contains that one sentiment analysis!). Now we have an outer Process that represents our filtered Twitter and Facebook streams (merged), and a bunch of inner streams that just perform the sentiment analysis. We feed this to mergeN (which is in the scalaz.stream.merge module) and the result is a single stream, sentiment, that contains all of our sentiment analysis. This stream is now strongly ordered and sequential (it's not going to magically change ordering on us just because it came from mergeN!), but the order almost certainly doesn't line up with the aboutUs stream. This is because sentiment analysis on certain text fragments might take a bit longer than on other fragments, and the quicker fragments are going to "win the race" and produce their sentiment analysis faster. This is of course, within the bounds of fairness and scheduling (something that mergeN takes great pains to get right), but you get the idea.

So when we want to relax the sequentiality of Process and run things in parallel, we can using mergeN. We have to work a bit in order to convince scalaz-stream that we're really sure about it, but we can absolutely get it done. One of the interesting things about this pattern is that applies uniformly to a lot of other natural use cases for parallel processing within a single stream. For example, an obvious way to model a server would be a stream of streams, where the inner streams represent the data coming from a particular client, and the outer is the stream of connections from different clients. A very, very common pattern with this sort of server-side programming is to use mergeN to collapse all of these streams together, handling connections in parallel (because that's what mergeN does!) within our maxOpen bound.

val server: Process[Task, Process[Task, ByteVector]] = ???

merge.mergeN(server) to (io fileChunkW "firehose.bin")

The ByteVector type in the above is something you might see a fair bit floating around scalaz-stream. Just think of it like a functional, fast Array[Byte]. If you want to read more about it, you can look up the scodec-bits project.

No need to explicitly fork off a thread to handle each client connection as we would need to do in a traditional server implementation! We just describe to scalaz-stream semantically what we want, which is to say handling all of the client connections in parallel, and it just goes off and does it for us! Composable combinators make this sort of thing very easy and very predictable.

Coordination and Flipping

Let's think about the following "real-world" sort of use case:

// gets called from the outside world, perhaps by a server socket or a UI
def receiveData(data: String): Unit = {
  // we're going to get called a lot, and we ideally want to process this stream of data with scalaz-stream
  // how can we get all of the data we receive *into* a Process?
}

This is a bit of a tricky question, since Process doesn't provide a mutable insert function. You can append two processes together using ++, but that operation returns a new Process without modifying the old ones. How do we "inject" data into a stream?

The answer to this is surprisingly straightforward and classical: we use a queue. Every time receiveData gets called, we take its value and dump it on a queue. Then, somewhere else, we have a Process that just sources its data from that queue, allowing us to process the firehose of data we receive using a clean and compositional framework.

This is a pretty natural solution to the problem, and something that should seem familiar to anyone who has done server-side development. Given that we already understand how to safely create processes from mutable sources (Task ftw!), it shouldn't be too difficult to implement.

val q = new ArrayBlockingQueue[String](10)    // never, EVER have an unbounded queue!

def receiveData(data: String): Unit =
  q.offer(data)

val received: Process[Task, String] = Process repeatEval (Task { q.take() })

So far, so good. However, you'll notice one really ugly thing in the middle here: q.take() is a blocking operation. Of course it does suspend the thread, so we're not eating up CPU resources, but we are eating up a member of our very finite thread pool! Do this sort of thing with enough processes simultaneously and you'll have a major thread starvation problem. Of course, we could use q.poll() and maybe employ some sort of timeout, but that's just taking a Thread.sleep and turning it into a busy-wait. Neither of these are particularly appealing solutions.

The answer to the problem can be found inside of scalaz-stream. It turns out that the above use case is so common that scalaz-stream has a custom queue implementation, one which doesn't ever block! This works through the magic of Task.async, but we don't really need to care about that at a high level. From a usage standpoint, scalaz-stream's queue looks exactly the same as a regular queue, except that we never ever block a thread waiting for data, so our throughput is extremely good even with dozens (or hundreds!) of queues floating around our application.

val q = async.boundedQueue[String](10)       // still a bad idea to have an unbounded queue!

def receiveData(data: String): Unit =
  q.enqueueOne(data).run         // enqueueOne returns a Task, so we need to run it

val received: Process[Task, String] = q.dequeue

It's that simple! We enqueue data using either enqueueOne, which takes a single value and returns a Task[Unit], or using the enqueue sink, which is a Sink[Task, String] (in our above example, anyway). Using the enqueue sink makes a lot of sense if we're trying to take one (or more!) processes and dump their data out into a queue for later processing. Using enqueueOne makes the most sense for use-cases like the above, where we're trying to interface with some sort of external notifier.

Calling q.dequeue gives us a process that reads its values sequentially off of the queue. There's nothing magical about this resulting stream; it's still a Process! In fact, it will look very very much like the repeatEval process that we implemented in our first attempt on top of ArrayBlockingQueue. The only real difference is that the q.dequeue process is never going to block when the queue is empty. Instead it just…stops running. No threads are harmed in the stopping of this process. As soon as new data is available in the queue, received will immediately spring back to life and handle the data as it arrives. It's perfect for these sorts of uses, and in real world scalaz-stream applications, async.boundedQueue is one of the most commonly applied tools.

Unfortunately, as cool as async.boundedQueue is, it doesn't solve every problem. The weakness of async.boundedQueue becomes apparent if you ever try to use more than one q.dequeue stream at the same time. As with a traditional queue, dequeuing from multiple threads simultaneously does not give all threads the same data! This may seem a bit obvious when said in this way, but the abstractions of scalaz-stream can make it appear a bit unintuitive.

val q = async.boundedQueue[Int](10)     // have I mentioned how important it is to specify a bound?

val left = q.dequeue
val right = q.dequeue

(Process(1, 2, 3, 4, 5) to q.enqueue).run.run     // put a bunch of data in the queue

val both = (left map { i => s"left: $i" }) merge (right map { i => s"right: $i" })
(both take 5).runLog.run             // => uh...could be anything!

The results at the end are going to be very non-deterministic. I ran this a few times and got the following results:

  • Vector(left: 2, right: 1, right: 3, left: 4, left: 5)
  • Vector(left: 1, right: 2, right: 3, left: 4, left: 5)
  • Vector(right: 2, left: 1, right: 4, left: 3, left: 5)
  • Vector(left: 1, right: 2, right: 3, left: 4, left: 5)

You get the picture. The point is that, in the above, you never see a value which is received by both left and right. They're getting different data! In fact, not only are they getting different data, but they are guaranteed to get different data! When one process dequeues a value, it is an atomic operation and no other process is allowed to get the same data. This is an important property of queues, but it can be just a bit surprising.

In any case, sometimes you really really do need to have a queue that can have multiple subscribers all receiving the same data. For example, if you're writing a group chat application, your chat server needs to have something inside of it that feeds incoming messages back out to all clients equally, not just one randomly-selected client! Scalaz-stream isn't blind to this use-case, and it provides a special construct called a topic which fits the bill. We can use this to build a very simple group chat server, just as I mentioned:

val connections: Process[Task, Process[Task, Exchange[ByteVector, ByteVector]]] =
  Netty server (new InetSocketAddress("localhost", 9000)) map {
    case (_, client) => client
  }

val room = async.topic[ByteVector]      // a room is a shared multi-queue of messages, which are just raw bytes

val server: Process[Task, Unit] = merge.mergeN(connections map { client =>
  client flatMap { ex =>
    val incoming = ex.read to room.publish
    val outgoing = room.subscribe to ex.write
    incoming merge outgoing
  }
})

There's a few new things in the above, but they're all really straightforward and built on what we've seen before. The weirdest one is this whole Netty server thing. This isn't something that is built into scalaz-stream, but it is built on top of scalaz-stream and available in the scalaz-netty project. It's basically just a scalaz-stream wrapper around the Netty NIO framework with a very simple API.

We've already seen ByteVector, but we haven't yet seen Exchange. This is something that's built into scalaz-stream, and it's really just a pair of a source and a sink. The read side of the exchange is a Process[Task, ByteVector], while the write side is a Sink[Task, ByteVector]. This very directly represents the bidirectional nature of a socket connection. Semantically, we want to be able to read data from the client and put it into the chat room, while simultaneously listening for new data in the chat room and writing it back down to the client.

This is where Topic comes in. We create our chat room up towards the top using async.topic[ByteVector]. Topic is simply a "multi-queue", where the defining property is that we can subscribe to the topic as many times as we want, and each subscriber will get the same data. Note that the data backlog is not saved; new clients will not see old messages (so this is IRC, not HipChat). We subscribe to the Topic and dump its contents directly into the write side of the client Exchange, while simultaneously taking the read side of the client and dumping it into the publish for the Topic (which is really just an enqueue sink). We want to read from and write to the client simultaneously, so we merge both sides of the operation. Finally, we want to be able to handle multiple client connections simultaneously, so we run mergeN over the whole server to produce the final result. Very simple, declarative and resource-safe!

Backpressure

Backpressure is one of those things that most people don't really think about until the first time they work on a distributed application with multiple services, all threading data from one place to another. It's a problem of scale that just doesn't occur in a simple test bed, but it is nonetheless an enormously important question to answer. For anyone who has done distributed services at scale, backpressure is usually the very first worry that you have, and deservedly so. The good news is that scalaz-stream, by its very design, supports backpressure in an extremely elegant and seamless fashion…mostly.

Wikipedia's "article" on backpressure is uncharacteristically uninformative, especially given its importance in modern systems architecture. Simply put, backpressure comes from accepting the fact that all servers have finite resources. We can write a server that handles a thousand connections per second, but if we open up the faucet a bit and crank that up to a million, or ten million, or a hundred million per second, that same server is going to start to falter. In this world of massive scale in data and distribution, we need to think very seriously about what it means to receive requests too fast for us to keep up. This is also the essential property of most DDoS attacks, so we're talking about something which matters for security as much as it matters for availability and performance!

Backpressure is the notion that our server (which is failing to keep up!) should somehow signal back to the clients that it is unable to keep up with the load and they need to ease up a bit. In a correctly designed system, the clients would then slow down and signal this throughput reduction back to their data sources. In other words, rather than having any single point in a system become overloaded and backed up, the entire system slows down gracefully and only handles the maximum that it can handle, without attempting to bite off more than it can chew.

As you can imagine, this turns out to be a somewhat hard problem to solve in distributed systems architecture. You need to be very careful in the way that you design not only your services but also the clients of those services, since backpressure does absolutely no good if the clients ignore the server's desperate pleas to slow down! (see also: DDoS attacks) First-class support for backpressure is an increasingly important feature of any framework which deals with streams of data, and scalaz-stream is no exception to this.

The good news is that scalaz-stream is designed from the ground up to naturally support backpressure without any configuration or tweaking from the user. In fact, all (but one) of our examples thus far have natively supported graceful backpressure without any explicit configuration on our part! This magic exists because scalaz-stream has a pull-based evaluation model.

When we use one of the scalaz-stream interpreters (e.g. runLog or run) to run a stream sequentially, the interpreter works in the following way:

  1. While input.hasNext
    1. Request next thing (data or effect) from input
    2. Evaluate any effects associated with thing
    3. Repeat

This is very straightforward and intuitive. In fact, it's probably the way you would write a Process interpreter on your own given no other information about how the library works. However, there's something very profound buried in this pseudocode: we only request the next thing after we are done processing the current thing. This is what gives Process its strongly-sequential nature, and it's also what defines its native support for backpressure.

If we're a server that is written using scalaz-stream, and we're struggling to keep up with the data flow being forced upon us by other services, we are naturally going to take longer to handle each thing in our interpreter. As we take longer to evaluate each thing, we take longer to request the next thing, and everything up the chain slows down! Remember that scalaz-stream is a fully lazy framework; there's no "magic lookahead" that computes things that we haven't asked for yet. Thus, as we slow down, our requests slow down, and our input slows down, and our backpressure propagates.

Of course, things do become a bit more complicated when you have concurrency and external push data sources to deal with. We've already seen how these things work, though! For example, mergeN sort of disrupts this pull model a bit, since it's running a whole bunch of processes all at once without waiting for us to request the next thing. That's ok though, because inside of mergeN is a bounded queue of data that is waiting to be pulled by the output stream (there is something similar inside of merge). As soon as that bounded queue fills up, mergeN stops and no longer runs its constituent processes! Thus, if mergeN is given a stream of constituent processes that run too fast for us, the internal queue will fill up and prevent mergeN from running away with all our memory, which in turn slows down the constituent processes (since mergeN is the one pulling on them!), which in turn slows down our upstream data sources. The same logic applies to an external interface point using async.boundedQueue.

This is precisely why it is so very important to always, ALWAYS supply a finite bound when you create an async.boundedQueue. Never, ever create an unbounded queue. An unbounded queue is basically a black hole which will fill and fill and fill and ultimately eat up all of your server's memory in the event that data is coming in faster than we can handle it. Always bound your queues, and don't be afraid to give a very small bound. I generally go with 10, but single digit bounds are not inappropriate! Unbounded queues are only appropriate on servers with unbounded memory, and servers with unbounded memory only exist in an infinite universe, which is not the universe we live in! Bound your queues. Always bound your queues.

I hinted earlier that all but one of our examples in this article support backpressure "out of the box", without any configuration from us. This is true, and the one example that doesn't support backpressure is the chat server. Contrary to expectations, the lack of backpressure isn't coming from the fact that we're using Netty and NIO (in fact, NIO supports backpressure extremely gracefully thanks to the way TCP is designed). The lack of backpressure actually comes from the fact that we're using Topic!

The problem is that Topic is a multi-queue with many subscribers and publishers. It becomes a really tricky question of what backpressure semantics even make sense. If one subscriber is running slowly, do we want to slow down all the subscribers? Which publisher do we slow down? What about fairness? All of these things are really tricky questions and it's hard to get this right. At present, Topic is punting on that whole mess and basically ignoring backpressure. Inside of Topic is a completely unbounded queue, and that queue can and will fill up if your services start slowing down! Be very, very, very careful of this. There is an open issue to address this design flaw, but at present it has not been fixed.

The good news is that you're going to use Queue a heck of a lot more than you'll use Topic, and Queue does support backpressure in exactly the way you want – as long as you don't give it an infinite bound!

Learning More

There's plenty more to learn about scalaz-stream, but the best way to learn it is just jump in with both feet and experience it for yourself! Ask questions on Twitter or in the Gitter room. File issues if (when!) you find bugs or behavior that you think is wrong. Maybe it is wrong! Scalaz-stream is under very active development, and a lot of ideas are being vetted and toyed with. Things are changing and evolving as we speak; within a few months even this article will be outdated!

The absolute best way to achieve guru status in scalaz-stream is to dive into the source code and start patching things. There's plenty of work to do, and plenty of easy points of entry to the code base. Outside of three very complex functions (wye.apply, nondeterminism.njoin and stepAsync), the scalaz-stream codebase is remarkably straightforward with very few surprises. Don't be afraid to poke around at things and see what happens!

Incidentally, if you're reading this as a Github Gist, be warned that the comment system on Gist is not connected to any sort of notifications! So if you comment on this as a Gist, I probably won't ever see it. Twitter is a much more effective place to ask questions.

@ghost

ghost commented Mar 24, 2015

Minor nitpick: "tenant" -> "tenet." :-) There's at least one "well" -> "will" in there, too.

rks987 commented Mar 28, 2015

Well if we're doing minor nitpicks, "contemporaneous" probably isn't what you want. Maybe "up to date" will do.

Thanks for this Daniel, much appreciated.

Since we're nitpicking: your assertion that our universe is not infinite is unfounded, and even dangerous.

Also, thanks for writing this.

Thank you so much Daniel. This really helped (as did your other articles and presentations).

Cheers!!!

mkrajc commented Dec 1, 2015

Thank for the walkthrough. It helped me a lot to start working with library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment