Skip to content

Instantly share code, notes, and snippets.

@dylemma
Last active August 16, 2018 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dylemma/b81bd58545dee233c6d982b47626f3c8 to your computer and use it in GitHub Desktop.
Save dylemma/b81bd58545dee233c6d982b47626f3c8 to your computer and use it in GitHub Desktop.
A bounded queue with blocking push and async pull, using Monix
import java.util.concurrent.Semaphore
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.Promise
import monix.eval.Task
import monix.execution.atomic.AtomicAny
import monix.execution.atomic.PaddingStrategy.LeftRight128
import monix.tail.Iterant
// roughly based on Monix's AsyncQueue implementation
object BlockingQueueIterant {
trait Producer[T] {
def push(item: T): Unit // blocks if queue is full
def signalEnd(): Unit // blocks if queue is full
}
/** Creates a send/receive channel where the "send" operations block if the receiver is slow,
* and the "receive" side is modeled as an Iterant.
*
* Used to bridge the gap between code written
* @param capacity
* @tparam T
* @return
*/
def apply[T](capacity: Int): (Producer[T], Iterant[Task, T]) = {
val sendSemaphore = new Semaphore(capacity)
val stateRef = AtomicAny.withPadding[State[T]](Empty, LeftRight128)
var didSendEOF = false
@tailrec def send(signal: Option[T]): Unit = {
if(didSendEOF) {
throw new IllegalArgumentException("send(None) may not be called twice")
}
sendSemaphore.acquire()
stateRef.get match {
// empty -> push into Sending queue
case current @ Empty =>
val nextState = Sending(Queue(signal))
if(stateRef.compareAndSet(current, nextState)) {
// set the EOF flag now that a None has successfully been "sent"
if(signal.isEmpty) didSendEOF = true
} else {
// try again
sendSemaphore.release()
send(signal)
}
// waiting -> satisfy a promise
case current @ Awaiting(promises: Queue[Promise[Option[T]]]) =>
if(promises.isEmpty){
// treat it as an `Empty` state
stateRef.compareAndSet(current, Empty)
// we're retrying either way, but only modifying the state
// if nobody else got to it while we were checking things
send(signal)
} else {
val (promise, tail) = promises.dequeue
val nextState = if(tail.isEmpty) Empty else Awaiting(tail)
if(stateRef.compareAndSet(current, nextState)) {
// set the EOF flag now that a None has successfully been "sent"
if(signal.isEmpty) didSendEOF = true
promise.success(signal)
sendSemaphore.release()
} else {
send(signal)
}
}
// sending -> push into the sending queue
case current @ Sending(queue) =>
// if we managed to acquire the semaphore in this state,
// there must be more room in the queue
val nextState = Sending(queue enqueue signal)
if(stateRef.compareAndSet(current, nextState)) {
// set the EOF flag now that a None has successfully been "sent"
if(signal.isEmpty) didSendEOF = true
} else {
// try again
sendSemaphore.release()
send(signal)
}
}
}
@tailrec def receive(): Task[Option[T]] = {
stateRef.get match {
// empty -> push a promise
case current @ Empty =>
val promise = Promise[Option[T]]
val nextState = Awaiting(Queue(promise))
if(stateRef.compareAndSet(current, nextState)) {
// successfully changed the state - return a Task wrapping the promise
Task fromFuture promise.future
} else {
// concurrent state modification -> retry
receive()
}
// awaiting -> push a promise
case current @ Awaiting(promises: Queue[Promise[Option[T]]]) =>
val promise = Promise[Option[T]]
val nextState = Awaiting(promises enqueue promise)
if(stateRef.compareAndSet(current, nextState)) {
// successfully changed the state - return a Task wrapping the promise
Task fromFuture promise.future
} else {
// concurrent state modification -> retry
receive()
}
// sending -> receive a value
case current @ Sending(queue) =>
if(queue.isEmpty) {
// treat it as an `Empty` state
stateRef.compareAndSet(current, Empty)
// we're retrying either way, but only modifying the state
// if nobody else got to it while we were checking things
receive()
} else {
val (signal, tail) = queue.dequeue
val nextState = if(tail.isEmpty) Empty else Sending(tail)
if(stateRef.compareAndSet(current, nextState)) {
// successfully changed the state - wrap the sent value as a Task,
// and release a permit for the sendSemaphore
sendSemaphore.release()
Task.now(signal)
} else {
// concurrent state modification -> retry
receive()
}
}
}
}
// a Task[Iterant] that keeps calling `receive()` forever
def nextReceives: Task[Iterant[Task, Option[T]]] = Task
.defer { receive() }
.map { signal => Iterant.Next(signal, nextReceives, Task.unit) }
val receiveStream = Iterant.Suspend(nextReceives, Task.unit)
.takeWhile(_.isDefined)
.collect { case Some(item) => item }
val producer = new Producer[T] {
def push(item: T): Unit = send(Some(item))
def signalEnd(): Unit = send(None)
}
producer -> receiveStream
}
private sealed trait State[+T]
private case object Empty extends State[Nothing]
private case class Sending[T](signals: Queue[Option[T]]) extends State[T]
private case class Awaiting[T](promises: Queue[Promise[Option[T]]]) extends State[T]
}
import scala.concurrent.duration._
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
object ExampleObservable {
// helper for println, but with a pseudo-timestamp
def log(msg: String) = {
val now = System.currentTimeMillis % 10000
println(f"[${now}%05d] $msg")
}
def main(args: Array[String]): Unit = {
val (producer, consumer) = BlockingQueueIterant[Int](2)
// run the producer on a separate thread, since we know it's gonna block
val t = new Thread(new Runnable {
def run(): Unit = {
for(i <- 1 to 10){
log(s"Sending $i...")
producer.push(i)
log(s" ...sent $i")
}
log("Sending EOF...")
producer.signalEnd()
log(" ... sent EOF")
}
})
t.start()
// consume 1 item per second
val slowDataPipeline = consumer.mapEval { i =>
log(s"Received $i")
Task.sleep(1.second).map { _ =>
log(s"Finished processing $i")
}
}
// run the consumer to completion
slowDataPipeline.completeL.runSyncUnsafe(20.seconds)
}
}
@dylemma
Copy link
Author

dylemma commented Aug 16, 2018

This should be helpful for translating code in the style of

def foreach(onItem: T => Unit): Unit

into

def items: Iterant[Task, T]

With the foreach (Traversable) style, items are pushed downstream, and any backpressure is expressed with blocking.
Translating such code to pull style requires one of two things:

  • Building your stream into a collection, then getting an iterator over that collection
  • Blocking

My use case assumes that the entire collection may not fit into memory, so I have to choose to block somewhere.
I'm doing so by blocking on push operations into a bounded queue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment