Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Created December 22, 2013 19:43
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pchiusano/8087426 to your computer and use it in GitHub Desktop.
Save pchiusano/8087426 to your computer and use it in GitHub Desktop.
Binding to asynchronous processes using scalaz-stream

When creating streams from an asynchronous process, the idiomatic thing is to create a stream from that process at the earliest possible stage, rather than using a queue to invert control after the fact. See the creating streams examples - generally, you just use the Process.eval and Process.repeatEval functions to build a stream by running some asynchronous task repeatedly.

That said, if you have some existing logic that you need to bind to that's already based on callbacks and side effects, you can use the functions in scalaz.stream.async. Here's an example, using a queue to invert control:

import scalaz.stream.async

val (q, src) = async.queue[Int]

// Thread 1
q.enqueue(1) // ordinary side-effecting calls
q.enqueue(2)
...
q.close

// Thread 2
src: Process[Task,Int]
src.take(10).to(snk).run

So, async.queue returns a mutable Queue, which can be populated by the producer, and on the other side, the consumer gets back an ordinary stream. It would be preferable (but not always possible) to just modify Thread 1 to just create the stream directly, rather than having the side effects of dumping to a mutable queue.

Similarly, there's also scalaz.stream.async.signal, which represents a (possibly) continuously updated value of some type. From a signal, you can obtain the discrete stream of when it changes, or the continous current value:

import scalaz.stream.async

val batchSize = async.signal[Int]

val now: Process[Task,Int] = batchSize.continuous
val onChange: Process[Task,Int] = batchSize.discrete

// Thread 1
batchSize.value.set(10)
..
batchSize.value.set(25)

// Thread 2
batchSize.discrete.take(10).map ...

You can also bind to an actor or any other callback-y API to get back a stream:

import scalaz.concurrent.Actor
case class M(cb: Throwable \/ MoarBytes => Unit)

val a = Actor.actor[M] { case M(cb) =>
  ...
  val result = reallyExpensiveOp(r)
  cb(result)
}

val t: Process[Task,MoarBytes] =
  Process.repeatEval(Task.async { cb => a ! M(cb) })

t.filter(_.canHazBytes).map(foo).fold(..)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment