Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import scala.concurrent.ExecutionContext
class Node[A](var a: A = null.asInstanceOf[A]) extends AtomicReference[Node[A]] {
def next: Node[A] = get()
def next_=(n: Node[A]): Unit = lazySet(n)
}
class MPSCQueue[A] {
private val head = new AtomicReference[Node[A]](new Node[A]()) // в самом начале head и tail
private val tail = new AtomicReference[Node[A]](head.get()) // указывают на одну и ту же пустую ячейку
def put(a: A): Unit = {
val n = new Node(a)
// получаем ссылку на последнюю добавленную ячейку, и двигаем указатель `head` на новую
val last = head.getAndSet(n)
// теперь за последней ячейкой есть еще одна
last.next = n
}
def pop(): Option[A] = {
// tail всегда указывает на пустую ячейку
val first = tail.get().next
if (first ne null) {
val a = first.a
first.a = null.asInstanceOf[A]
tail.lazySet(first)
Some(a)
} else None
}
def nonEmpty: Boolean = tail.get.next ne null
}
abstract class Actor[A](ec: ExecutionContext) {
private val mailbox = new MPSCQueue[A]()
// lock для синхронизации чтения mailbox
private val lock = new AtomicBoolean(false)
// Пользовательская логика обработки сообщений
def receive: A Unit
def apply(msg: A): Unit = {
mailbox.put(msg)
// при каждом получении сообщения будем пытаться сразу его обработать
tryHandle()
}
private def tryHandle(): Unit = {
// если процесс разбора еще не был запущен,
// запустим его, захватив при этом lock
if (lock.compareAndSet(false, true)) handle()
}
private def handle(): Unit = ec.execute {
() mailbox.pop() match {
case Some(msg)
// если очередь не пуста, обработаем сообщение
receive(msg)
// и продолжим разбор mailbox
handle()
case None
// когда очередь опустеет, отпустим lock
lock.lazySet(false)
// между получением результата от `pop` и обновлением lock-а
// могло прийти новое сообщение, но тк lock еще не был отпущен,
// а `pop` вернул None, цикл обработки сообщений может прерваться.
// чтобы этого не произошло, попытаемся запустить его снова
if (mailbox.nonEmpty) tryHandle()
}
}
}
object Actor {
def apply[A](f: A Unit)(implicit ec: ExecutionContext): Actor[A] = new Actor[A](ec) {
override def receive: A Unit = f
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.