Подтвердить что ты не робот

Акка Акка, Фьючерсы и закрытия

Я прочитал в Akka docs, что опасно закрывать переменные от окружающего актера.

Внимание

В этом случае вам необходимо осторожно избегать закрытия содержащие ссылки на участников, т.е. не вызывают методы на включающий актера из анонимного класса Актера. Это нарушить инкапсуляцию актера и может привести к ошибкам синхронизации и условия гонки, потому что другой код участников будет запланирован одновременно с окружающим актером.

Теперь у меня есть два актера, один из которых просит что-то из второго и что-то делает с результатом. В этом примере ниже я собрал актер Аккумулятор извлекает числа из actor NumberGenerator и добавляет их, сообщая сумму по пути.

Это можно сделать, по крайней мере, двумя разными способами, как показано в этом примере с двумя различными функциями приема (A vs B). Разница между ними заключается в том, что A не закрывается над переменной счетчика; вместо этого он ожидает целое число и суммирует его, а B создает Будущее, которое закрывает счетчик и делает сумму. Это происходит внутри анонимного актера, созданного только для обработки onSuccess, если я правильно понимаю, как это работает.

import com.esotericsoftware.minlog.Log

import akka.actor.{Actor, Props}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import akka.util.duration._

case object Start
case object Request


object ActorTest {
  var wake = 0

  val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
  val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")

  Log.info("ActorTest", "Starting !")

  accRef ! Start
}

class Accumulator extends Actor {
  var counter = 0

  implicit val timeout = Timeout(5 seconds)

  // A: WITHOUT CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
    case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
  }
  // B: WITH CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess {
      case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
    }
  }
}

class NumberGenerator extends Actor {
  val rand = new java.util.Random()

  def receive = {
    case Request => sender ! rand.nextInt(11)-5
  }
}

Является ли совершенно злым использовать закрытие в этом случае? Конечно, я мог бы использовать AtomicInteger вместо Int или в каком-то сетевом сценарии, используя, скажем, netty, выпустить операцию записи на threadsafe, но это не моя точка здесь.

Чтобы рискнуть спросить смешного: есть ли способ для будущего onSuccess выполнить в этом акторе вместо анонимного среднего актера, без, определяющего случай в функция приема?

ИЗМЕНИТЬ

Чтобы выразить это более ясно, мой вопрос: есть ли способ заставить серию фьючерсов работать в том же потоке, что и данный Актер?

4b9b3361

Ответ 1

Проблема заключается в том, что onSuccess будет запускаться в другом потоке, чем поток, в который будет работать актер receive. Вы можете использовать подход pipeTo или использовать Agent. Создание counter a AtomicInteger решит проблему, но это не так чисто, то есть разбивает модель Актера.

Ответ 2

Самый простой способ реализации такого дизайна - использовать семантику "огонь и забыть":

class Accumulator extends Actor {
  private[this] var counter = 0

  def receive = {
    case Start => ActorTest.genRef ! Request
    case x: Int => {
      counter += x
      Log.info("Accumulator", "counter = " + counter)
      self ! Start
    }
  }
}

Это решение полностью асинхронно, и вам не нужен тайм-аут.