Разбор реализации актора из книги FP in Scala

cover of the book В книге Functional Programming in Scala, в главе Purely functional parallelism, в конечной реализации Par[a], авторы ссылаются на акторную модель вычислений и приводят в пример ее реализацию: Actor.scala. В основе реализации приводимого актора лежат идеи Non-intrusive MPSC node-based queue, которые не могут не вызывать восторга! В своей статье хочу попытаться разъяснить для себя и всех, кому это будет интересно, эти идеи.

В основе акторной модели лежит идея о том, что каждый актор обладает собственным mailbox-ом, который может пополняться сообщениями в конкурентной манере, а разбираться исключительно синхронно. Для реализации mailbox-а актора нужна соответствующая Multi-Producers-Single-Consumer очередь. Давайте представим ее в виде односвязного списка с двумя заголовками head и tail . Первый будет указывать на конец списка в который будут добавляться новые элементы, второй указатель будет ссылать на конец списка, с которого будет происходить чтение очереди (названия взяты из оригинальной статьи и, вообще говоря, противоречат общепринятым).

Каждый элемент такого списка размещен в ячейке:
class  Node[A](var  a:  A  =  null.asInstanceOf[A]) extends  AtomicReference[Node[A]] {
  // для наглядности опишем явно метод `next`
  def next: Node[A] = get()
  // и метод для изменения ссылки на следующую ячейку
  def next_=(n: Node[A]): Unit = lazySet(n)
}

Запись в очередь

В самом начале head и tail указывают на одну и ту же пустую ячейку:

class MPSCQueue[A] {
  private val head = new AtomicReference[Node[A]](new Node[A]()) 
  private val tail = new AtomicReference[Node[A]](head.get())
}
Рассмотрим внимательно процесс добавления элементов в очередь:
def put(a: A): Unit = {  
  val n = new Node(a)  
  // получаем ссылку на последнюю добавленную ячейку 
  // и двигаем указатель `head` на новую  
  val last = head.getAndSet(n)  
  // теперь за последней ячейкой есть еще одна  
  last.next = n  
}
Процесс вставки элементов:

Обратите внимание на то, что между изменением указателя head и обновлением ссылки на новую ячейку, следующую за последней, может случиться гонка, и некоторый второй поток может успеть полностью выполнить метод put!
Представим себе гонку при вставке самого первого элемента в очередь. Первый поток успевает получить ссылку на пустую ячейку S, но не успевает изменить указатель S.next:

Теперь второй поток начинает выполнять метод put и в качестве последней ячейки видит ячейку N1, но указатель head устанавливает на новую ячейку N2, при этом связывая N2 как следующую за N1:

В этот момент вся очередь приходит в неконсистентное состояние, т.к. со стороны указателя tail, с которого выполняется чтение очереди, очередь пуста!
Но, когда первый поток продолжит свое выполнение, он восстановит утерянную связь между S и N1 (тк его указатель last ссылается на S, а в качестве S.next будет задана N1), и очередь снова станет консистентной!

Чтение из очереди

Подразумевается, что чтение рассматриваемой очереди выполняется синхронно, поэтому здесь не предлагается никаких средств синхронизации. В условиях отсутствия гонки, алгоритм получения элемента из очереди крайне прост:
def pop(): Option[A] = {  
  // tail всегда указывает на пустую ячейку
  val first = tail.get().next  
  if (first ne null) {  
    val a = first.a  
    first.a = null.asInstanceOf[A]  
    tail.lazySet(first)  
    Some(a)  
  } else None  
}

Реализация актора

Синхронизация чтения выполняется уже в реализации цикла обработки сообщений актора. Для этого подойдет lock на основе AtomicBoolean:
val lock = new AtomicBoolean(false)
def tryHandle(): Unit = {  
  // если процесс разбора еще не был запущен,  
  // запустим его, захватив при этом lock  
  if (lock.compareAndSet(false, true)) handle()  
}
def handle(): Unit = mailbox.pop() match {  
  case Some(msg) ⇒  
    // если очередь не пуста, обработаем сообщение  
    receive(msg)  
    // и продолжим разбор mailbox  
    handle()  
  
  case None ⇒  
    // когда очередь опустеет, отпустим lock  
    lock.lazySet(false)  
  }  
}
Процесс разбора очереди можно пытаться запускать снова и снова при получении новых сообщений:
def apply(msg: A): Unit = {  
  mailbox.put(msg)  
  tryHandle() // процесс будет запущен только 1 раз, благодарая lock-у
}
У описанной реализации цикла обработки сообщений есть пара недостатков. Во-первых, обработка сообщений будет происходить в том же потоке, что и отправка сообщения. Чтобы этого избежать, каждый вызов метода handle можно выполнять на отдельном потоке из пула (на самом деле сложно однозначно сказать какое решение лучше, поэтому авторы приводят реализацию актора, в которой данное поведение задается пользователем явным образом):
def handle(): Unit = ec.execute {  
  () ⇒ mailbox.pop() match {  
    case Some(msg) ⇒  
   receive(msg)  
   handle()  
  
    case None ⇒  
   lock.lazySet(false)  
  }  
}
Во-вторых, цикл обработки может быть прерван, но при этом сообщения в очереди могут все еще оставаться. Это может произойти в случае, когда между получением результата от pop и обновлением lock-а (в ветке с None до lazySet) придет новое сообщение, но тк lock еще не был отпущен, а pop вернул None, цикл обработки сообщений может прерваться. Чтобы этого не произошло, попытаемся запустить его:
def handle(): Unit = ec.execute {  
  () ⇒ mailbox.pop() match {  
    case Some(msg) ⇒  
   receive(msg)  
   handle()  
  
    case None ⇒  
   lock.lazySet(false)  
   if (mailbox.nonEmpty) tryHandle()  
  }  
}
Где receive - пользовательская функция для реагирования на сообщения, nonEmpty - метод очереди реализованный как:
def nonEmpty: Boolean = tail.get.next ne null
Исходный код полного решения описанного в статье вы можете найти на github-е.

В решении, предлагаемом авторами книги, можно заметить дополнительную оптимизацию, которая позволяет за один вызов метода handle обработать более одного сообщения, но она тривиальна и не нуждается в дополнительном разъяснении.

На этом для себя главу считаю освоенной, тк прочие аспекты куда проще для понимания :)

Комментариев нет: