Created
May 24, 2019 18:15
-
-
Save dokwork/6e81b94ba2b9fd3726f0fd9b446a903d to your computer and use it in GitHub Desktop.
Source code for https://www.dokwork.ru/2019/05/mpscqueue.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } | |
import scala.concurrent.ExecutionContext | |
class Node[A](var a: A = null.asInstanceOf[A]) extends AtomicReference[Node[A]] { | |
def next: Node[A] = get() | |
def next_=(n: Node[A]): Unit = lazySet(n) | |
} | |
class MPSCQueue[A] { | |
private val head = new AtomicReference[Node[A]](new Node[A]()) // в самом начале head и tail | |
private val tail = new AtomicReference[Node[A]](head.get()) // указывают на одну и ту же пустую ячейку | |
def put(a: A): Unit = { | |
val n = new Node(a) | |
// получаем ссылку на последнюю добавленную ячейку, и двигаем указатель `head` на новую | |
val last = head.getAndSet(n) | |
// теперь за последней ячейкой есть еще одна | |
last.next = n | |
} | |
def pop(): Option[A] = { | |
// tail всегда указывает на пустую ячейку | |
val first = tail.get().next | |
if (first ne null) { | |
val a = first.a | |
first.a = null.asInstanceOf[A] | |
tail.lazySet(first) | |
Some(a) | |
} else None | |
} | |
def nonEmpty: Boolean = tail.get.next ne null | |
} | |
abstract class Actor[A](ec: ExecutionContext) { | |
private val mailbox = new MPSCQueue[A]() | |
// lock для синхронизации чтения mailbox | |
private val lock = new AtomicBoolean(false) | |
// Пользовательская логика обработки сообщений | |
def receive: A ⇒ Unit | |
def apply(msg: A): Unit = { | |
mailbox.put(msg) | |
// при каждом получении сообщения будем пытаться сразу его обработать | |
tryHandle() | |
} | |
private def tryHandle(): Unit = { | |
// если процесс разбора еще не был запущен, | |
// запустим его, захватив при этом lock | |
if (lock.compareAndSet(false, true)) handle() | |
} | |
private def handle(): Unit = ec.execute { | |
() ⇒ mailbox.pop() match { | |
case Some(msg) ⇒ | |
// если очередь не пуста, обработаем сообщение | |
receive(msg) | |
// и продолжим разбор mailbox | |
handle() | |
case None ⇒ | |
// когда очередь опустеет, отпустим lock | |
lock.lazySet(false) | |
// между получением результата от `pop` и обновлением lock-а | |
// могло прийти новое сообщение, но тк lock еще не был отпущен, | |
// а `pop` вернул None, цикл обработки сообщений может прерваться. | |
// чтобы этого не произошло, попытаемся запустить его снова | |
if (mailbox.nonEmpty) tryHandle() | |
} | |
} | |
} | |
object Actor { | |
def apply[A](f: A ⇒ Unit)(implicit ec: ExecutionContext): Actor[A] = new Actor[A](ec) { | |
override def receive: A ⇒ Unit = f | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment