Last active
August 16, 2018 21:55
-
-
Save dylemma/b81bd58545dee233c6d982b47626f3c8 to your computer and use it in GitHub Desktop.
A bounded queue with blocking push and async pull, using Monix
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.Semaphore | |
import scala.annotation.tailrec | |
import scala.collection.immutable.Queue | |
import scala.concurrent.Promise | |
import monix.eval.Task | |
import monix.execution.atomic.AtomicAny | |
import monix.execution.atomic.PaddingStrategy.LeftRight128 | |
import monix.tail.Iterant | |
// roughly based on Monix's AsyncQueue implementation | |
object BlockingQueueIterant { | |
trait Producer[T] { | |
def push(item: T): Unit // blocks if queue is full | |
def signalEnd(): Unit // blocks if queue is full | |
} | |
/** Creates a send/receive channel where the "send" operations block if the receiver is slow, | |
* and the "receive" side is modeled as an Iterant. | |
* | |
* Used to bridge the gap between code written | |
* @param capacity | |
* @tparam T | |
* @return | |
*/ | |
def apply[T](capacity: Int): (Producer[T], Iterant[Task, T]) = { | |
val sendSemaphore = new Semaphore(capacity) | |
val stateRef = AtomicAny.withPadding[State[T]](Empty, LeftRight128) | |
var didSendEOF = false | |
@tailrec def send(signal: Option[T]): Unit = { | |
if(didSendEOF) { | |
throw new IllegalArgumentException("send(None) may not be called twice") | |
} | |
sendSemaphore.acquire() | |
stateRef.get match { | |
// empty -> push into Sending queue | |
case current @ Empty => | |
val nextState = Sending(Queue(signal)) | |
if(stateRef.compareAndSet(current, nextState)) { | |
// set the EOF flag now that a None has successfully been "sent" | |
if(signal.isEmpty) didSendEOF = true | |
} else { | |
// try again | |
sendSemaphore.release() | |
send(signal) | |
} | |
// waiting -> satisfy a promise | |
case current @ Awaiting(promises: Queue[Promise[Option[T]]]) => | |
if(promises.isEmpty){ | |
// treat it as an `Empty` state | |
stateRef.compareAndSet(current, Empty) | |
// we're retrying either way, but only modifying the state | |
// if nobody else got to it while we were checking things | |
send(signal) | |
} else { | |
val (promise, tail) = promises.dequeue | |
val nextState = if(tail.isEmpty) Empty else Awaiting(tail) | |
if(stateRef.compareAndSet(current, nextState)) { | |
// set the EOF flag now that a None has successfully been "sent" | |
if(signal.isEmpty) didSendEOF = true | |
promise.success(signal) | |
sendSemaphore.release() | |
} else { | |
send(signal) | |
} | |
} | |
// sending -> push into the sending queue | |
case current @ Sending(queue) => | |
// if we managed to acquire the semaphore in this state, | |
// there must be more room in the queue | |
val nextState = Sending(queue enqueue signal) | |
if(stateRef.compareAndSet(current, nextState)) { | |
// set the EOF flag now that a None has successfully been "sent" | |
if(signal.isEmpty) didSendEOF = true | |
} else { | |
// try again | |
sendSemaphore.release() | |
send(signal) | |
} | |
} | |
} | |
@tailrec def receive(): Task[Option[T]] = { | |
stateRef.get match { | |
// empty -> push a promise | |
case current @ Empty => | |
val promise = Promise[Option[T]] | |
val nextState = Awaiting(Queue(promise)) | |
if(stateRef.compareAndSet(current, nextState)) { | |
// successfully changed the state - return a Task wrapping the promise | |
Task fromFuture promise.future | |
} else { | |
// concurrent state modification -> retry | |
receive() | |
} | |
// awaiting -> push a promise | |
case current @ Awaiting(promises: Queue[Promise[Option[T]]]) => | |
val promise = Promise[Option[T]] | |
val nextState = Awaiting(promises enqueue promise) | |
if(stateRef.compareAndSet(current, nextState)) { | |
// successfully changed the state - return a Task wrapping the promise | |
Task fromFuture promise.future | |
} else { | |
// concurrent state modification -> retry | |
receive() | |
} | |
// sending -> receive a value | |
case current @ Sending(queue) => | |
if(queue.isEmpty) { | |
// treat it as an `Empty` state | |
stateRef.compareAndSet(current, Empty) | |
// we're retrying either way, but only modifying the state | |
// if nobody else got to it while we were checking things | |
receive() | |
} else { | |
val (signal, tail) = queue.dequeue | |
val nextState = if(tail.isEmpty) Empty else Sending(tail) | |
if(stateRef.compareAndSet(current, nextState)) { | |
// successfully changed the state - wrap the sent value as a Task, | |
// and release a permit for the sendSemaphore | |
sendSemaphore.release() | |
Task.now(signal) | |
} else { | |
// concurrent state modification -> retry | |
receive() | |
} | |
} | |
} | |
} | |
// a Task[Iterant] that keeps calling `receive()` forever | |
def nextReceives: Task[Iterant[Task, Option[T]]] = Task | |
.defer { receive() } | |
.map { signal => Iterant.Next(signal, nextReceives, Task.unit) } | |
val receiveStream = Iterant.Suspend(nextReceives, Task.unit) | |
.takeWhile(_.isDefined) | |
.collect { case Some(item) => item } | |
val producer = new Producer[T] { | |
def push(item: T): Unit = send(Some(item)) | |
def signalEnd(): Unit = send(None) | |
} | |
producer -> receiveStream | |
} | |
private sealed trait State[+T] | |
private case object Empty extends State[Nothing] | |
private case class Sending[T](signals: Queue[Option[T]]) extends State[T] | |
private case class Awaiting[T](promises: Queue[Promise[Option[T]]]) extends State[T] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.duration._ | |
import monix.eval.Task | |
import monix.execution.Scheduler.Implicits.global | |
object ExampleObservable { | |
// helper for println, but with a pseudo-timestamp | |
def log(msg: String) = { | |
val now = System.currentTimeMillis % 10000 | |
println(f"[${now}%05d] $msg") | |
} | |
def main(args: Array[String]): Unit = { | |
val (producer, consumer) = BlockingQueueIterant[Int](2) | |
// run the producer on a separate thread, since we know it's gonna block | |
val t = new Thread(new Runnable { | |
def run(): Unit = { | |
for(i <- 1 to 10){ | |
log(s"Sending $i...") | |
producer.push(i) | |
log(s" ...sent $i") | |
} | |
log("Sending EOF...") | |
producer.signalEnd() | |
log(" ... sent EOF") | |
} | |
}) | |
t.start() | |
// consume 1 item per second | |
val slowDataPipeline = consumer.mapEval { i => | |
log(s"Received $i") | |
Task.sleep(1.second).map { _ => | |
log(s"Finished processing $i") | |
} | |
} | |
// run the consumer to completion | |
slowDataPipeline.completeL.runSyncUnsafe(20.seconds) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This should be helpful for translating code in the style of
into
With the
foreach
(Traversable) style, items are pushed downstream, and any backpressure is expressed with blocking.Translating such code to pull style requires one of two things:
My use case assumes that the entire collection may not fit into memory, so I have to choose to block somewhere.
I'm doing so by blocking on push operations into a bounded queue.