Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Last active December 20, 2015 21:39
Show Gist options
  • Save pchlupacek/6199638 to your computer and use it in GitHub Desktop.
Save pchlupacek/6199638 to your computer and use it in GitHub Desktop.
Attempt to integrate process with outside world in nonblocking way
package com.spinoco.util.streams
import java.util.concurrent.{LinkedBlockingQueue, ConcurrentLinkedQueue}
import scalaz.stream.Process
import scalaz.stream.processes._
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scalaz.concurrent.Task
import scalaz.\/
import scalaz.syntax.id._
import java.util
/**
*
* User: pach
* Date: 8/9/13
* Time: 5:53 PM
* (c) 2011-2013 Spinoco Czech Republic, a.s.
*/
/**
* Asynchronous Queue that allows asynchronously push to the Process. Items from the
* Queue can be only taken by the accompanying `Process[Task,A]`.
*
* Source process would not block on the queue when the queue is empty, but will process
* any elements present in the queue.
*
* If the queue is empty, process will register its call back that will eventually make
* next step of the process once the new A will appear in the Queue.
*
*
* Also please note that once Source is stopped, queue will reject all enqueue operations
*
*
*/
trait ProcessQueue[A] {
/*
* This implementation is based on the assumption that process never reads from two threads
* simultaneously.
*/
import Task._
private[streams] val q: util.Queue[A]
private[streams] val cb = new AtomicReference[Option[A => Unit]](None)
private[streams] val qs = new AtomicInteger(0)
//set to true if, the process has terminated
@volatile private[streams] var processTerminated = false
/**
* Gets the size of the queue. The operation is constant time, but may not be 100% accurate.
* Specifically queue size may be slightly larger than actually this method reports
* @return
*/
def size: Int = qs.get
/**
* Indicates that the underlying process terminated. The enqueue method will fail when this returns true
* @return
*/
def terminated: Boolean = processTerminated
/**
* Multiple threads can enqueue messages to this queue. If the process is not taking
* messages from this queue, the thread that enqueue have opportunity to check size of the queue
* via [[com.spinoco.util.streams.ProcessQueue.size]] method and eventually throttle the speed
* of the insertion.
* Alternatively every enqueue operation return estimated size if the enqueue was successful or the exception
* indicating that A's cannot get enqueue.
*
* @param a item to enqueue
* @return either current estimated size of the queue after insertion or the failure when
* the queue is not ready to accept more elements
*/
def enqueue(a: A): Throwable \/ Int = {
//to preserve order we must offer in q first
if (!processTerminated) {
if (addToQueue(a)) {
qs.incrementAndGet()
tryProcess
qs.get.right
} else {
new IllegalStateException("Maximum size of the queue exceeded").left
}
} else {
new IllegalStateException("Process already terminated").left
}
}
/**
* Safely inserts to queue, returning true if insert succeeds, false if not
* @param a
* @return
*/
private[streams] def addToQueue(a: A): Boolean
private def tryProcess = {
def go(): Unit = {
if (q.peek() != null) {
cb.getAndSet(None) match {
case scb@Some(callback) =>
q.poll() match {
case null =>
// it very rare scenarios this is valid state so lets put callback back in the position
// and retry
cb.set(scb)
go
case a =>
callback(a)
qs.decrementAndGet()
}
case None => //no callback process must get that from queue
}
}
}
go
}
/**
* Produces the process that can be used as source of A's to be further processed.
*
* @return
*/
private[streams] def makeSource: Process[Task, A] = resource[Unit, A](now())(_ => delay {
processTerminated = true
})({
_ =>
async {
(register: Throwable \/ A => Unit) =>
val ourCb: Option[A => Unit] = Some(a => register(a.right))
cb.set(ourCb)
if (q.peek() != null) {
if (cb.compareAndSet(ourCb, None)) {
//ok we have 'a' in queue and we are owner of the callback
//that means nobody else in the world can get that 'a' and process it
//but to be 100% safe we just register case for null situation here
q.poll match {
case null => register(new RuntimeException("Unexpected state, leaking queue :-)").left)
case a =>
register(a.right)
qs.decrementAndGet()
}
}
}
}
})
}
object ProcessQueue {
def unbounded[A]: (ProcessQueue[A], Process[Task, A]) = {
val pq = new ProcessQueue[A] {
private[streams] val q: util.Queue[A] = new ConcurrentLinkedQueue[A]()
private[streams] def addToQueue(a: A): Boolean = q.offer(a)
}
(pq, pq.makeSource)
}
def bounded[A](maxSize: Int): (ProcessQueue[A], Process[Task, A]) = {
val pq = new ProcessQueue[A] {
private[streams] val q: util.Queue[A] = new LinkedBlockingQueue[A](maxSize)
private[streams] def addToQueue(a: A): Boolean = q.offer(a)
}
(pq, pq.makeSource)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment