В книге Functional Programming in Scala, в главе Purely functional parallelism, в конечной реализации Par[a] , авторы ссылаются на акторную модель вычислений и приводят в пример ее реализацию: Actor.scala. В основе реализации приводимого актора лежат идеи Non-intrusive MPSC node-based queue, которые не могут не вызывать восторга! В своей статье хочу попытаться разъяснить для себя и всех, кому это будет интересно, эти идеи. |
В основе акторной модели лежит идея о том, что каждый актор обладает собственным mailbox-ом, который может пополняться сообщениями в конкурентной манере, а разбираться исключительно синхронно. Для реализации mailbox-а актора нужна соответствующая Multi-Producers-Single-Consumer очередь. Давайте представим ее в виде односвязного списка с двумя заголовками
head
и tail
. Первый будет указывать на конец списка в который будут добавляться новые элементы, второй указатель будет ссылать на конец списка, с которого будет происходить чтение очереди (названия взяты из оригинальной статьи и, вообще говоря, противоречат общепринятым).Каждый элемент такого списка размещен в ячейке:
class Node[A](var a: A = null.asInstanceOf[A]) extends AtomicReference[Node[A]] {
// для наглядности опишем явно метод `next`
def next: Node[A] = get()
// и метод для изменения ссылки на следующую ячейку
def next_=(n: Node[A]): Unit = lazySet(n)
}
Запись в очередь
В самом началеhead
и tail
указывают на одну и ту же пустую ячейку:class MPSCQueue[A] {
private val head = new AtomicReference[Node[A]](new Node[A]())
private val tail = new AtomicReference[Node[A]](head.get())
}
def put(a: A): Unit = {
val n = new Node(a)
// получаем ссылку на последнюю добавленную ячейку
// и двигаем указатель `head` на новую
val last = head.getAndSet(n)
// теперь за последней ячейкой есть еще одна
last.next = n
}
Обратите внимание на то, что между изменением указателя
head
и обновлением ссылки на новую ячейку, следующую за последней, может случиться гонка, и некоторый второй поток может успеть полностью выполнить метод put
!Представим себе гонку при вставке самого первого элемента в очередь. Первый поток успевает получить ссылку на пустую ячейку S, но не успевает изменить указатель
S.next
:Теперь второй поток начинает выполнять метод
put
и в качестве последней ячейки видит ячейку N1
, но указатель head
устанавливает на новую ячейку N2
, при этом связывая N2
как следующую за N1
:В этот момент вся очередь приходит в неконсистентное состояние, т.к. со стороны указателя
tail
, с которого выполняется чтение очереди, очередь пуста!Но, когда первый поток продолжит свое выполнение, он восстановит утерянную связь между
S
и N1
(тк его указатель last
ссылается на S
, а в качестве S.next
будет задана N1
), и очередь снова станет консистентной!Чтение из очереди
Подразумевается, что чтение рассматриваемой очереди выполняется синхронно, поэтому здесь не предлагается никаких средств синхронизации. В условиях отсутствия гонки, алгоритм получения элемента из очереди крайне прост:def pop(): Option[A] = {
// tail всегда указывает на пустую ячейку
val first = tail.get().next
if (first ne null) {
val a = first.a
first.a = null.asInstanceOf[A]
tail.lazySet(first)
Some(a)
} else None
}
Реализация актора
Синхронизация чтения выполняется уже в реализации цикла обработки сообщений актора. Для этого подойдетlock
на основе AtomicBoolean
:val lock = new AtomicBoolean(false)
def tryHandle(): Unit = {
// если процесс разбора еще не был запущен,
// запустим его, захватив при этом lock
if (lock.compareAndSet(false, true)) handle()
}
def handle(): Unit = mailbox.pop() match {
case Some(msg) ⇒
// если очередь не пуста, обработаем сообщение
receive(msg)
// и продолжим разбор mailbox
handle()
case None ⇒
// когда очередь опустеет, отпустим lock
lock.lazySet(false)
}
}
def apply(msg: A): Unit = {
mailbox.put(msg)
tryHandle() // процесс будет запущен только 1 раз, благодарая lock-у
}
handle
можно выполнять на отдельном потоке из пула (на самом деле сложно однозначно сказать какое решение лучше, поэтому авторы приводят реализацию актора, в которой данное поведение задается пользователем явным образом):def handle(): Unit = ec.execute {
() ⇒ mailbox.pop() match {
case Some(msg) ⇒
receive(msg)
handle()
case None ⇒
lock.lazySet(false)
}
}
pop
и обновлением lock-а (в ветке с None
до lazySet
) придет новое сообщение, но тк lock
еще не был отпущен, а pop
вернул None, цикл обработки сообщений может прерваться. Чтобы этого не произошло, попытаемся запустить его:def handle(): Unit = ec.execute {
() ⇒ mailbox.pop() match {
case Some(msg) ⇒
receive(msg)
handle()
case None ⇒
lock.lazySet(false)
if (mailbox.nonEmpty) tryHandle()
}
}
Где receive
- пользовательская функция для реагирования на сообщения, nonEmpty
- метод очереди реализованный как:def nonEmpty: Boolean = tail.get.next ne null
В решении, предлагаемом авторами книги, можно заметить дополнительную оптимизацию, которая позволяет за один вызов метода
handle
обработать более одного сообщения, но она тривиальна и не нуждается в дополнительном разъяснении.На этом для себя главу считаю освоенной, тк прочие аспекты куда проще для понимания :)
Комментариев нет:
Отправить комментарий