Подтвердить что ты не робот

Какое сообщение отправляет pipeTo на тайм-аут или другой сбой?

В Akka вместо использования onComplete для будущего ответа, созданного с помощью?, я пытаюсь использовать pipeTo, потому что это, предположительно, предпочтительный шаблон. Тем не менее, я, кажется, не получаю никаких Throwables или Failures, когда будущее истечет. Что я должен ожидать получить у своего актера, если тайм-аут возникает при использовании pipeTo? Как насчет того, когда будет создано другое исключение? Пример кода:

class Simple(otherActor : ActorRef) extends Actor{
  def receive = {
     case "some_msg" => {
       val implicit timeout = Timeout(1 seconds)
       val response = otherActor ? "hello" 
       response pipeTo self
     }

     // case ??? // How do I handle timeouts?
  }
}

Если сообщение не отправляется автоматически при возникновении тайм-аута, как я должен обрабатывать таймауты с помощью pipeTo?

4b9b3361

Ответ 1

Сбой будущего отправляется как сообщение akka.actor.Status.Failure, содержащее исключение. Исключением для тайм-аута является akka.pattern.AskTimeoutException.

Ответ 2

Если ваш пример близко соответствует вашему фактическому коду, я не уверен, что pipeTo - это то, что вы хотите здесь. Сопровождение сообщения для себя, для меня, не имеет особого смысла, и есть лучшие решения для случая, когда актер отправляет сообщение другому актеру, а затем ждет ответа. Сначала позвольте говорить о pipeTo. Я думаю, что хорошим примером того, когда использовать pipeTo, является то, что у вас было три актера: A, B и C. A отправляет сообщение B, которое, в свою очередь, отправляет сообщение на C и этот ответ от C должен быть возвращен в после того, как B сначала сделает что-то еще. В этом примере вы можете сделать что-то вроде этого внутри B:

val fut = actorC ? someMessage
fut map(someMapFunc) pipeTo sender

Здесь функция pipeTo помогает предотвратить случайное закрытие измененного sender var, если вы вместо этого используете что-то вроде onComplete и отвечаете на sender внутри этого обратного вызова.

Теперь, для вашего случая, если вы просто хотите, чтобы A разговаривал с B, а затем ждать ответа от B (и обрабатывать потенциальные таймауты), вы можете попробовать что-то вроде этого:

class ActorA extends Actor{
  import context._
  val myB = context.actorOf(Props[ActorB])

  def receive = {
    case msg =>
      myB ! msg
      setReceiveTimeout(2 seconds)
      become(waitingForResponse)
  }

  def waitingForResponse:Receive = {
    case ReceiveTimeout =>
      println("got a receive timeout")
      cancelReceiveTimeout

    case response =>
      println("got my response back")
      cancelReceiveTimeout
  }

  def cancelReceiveTimeout = setReceiveTimeout(Duration.Undefined) 
}

В этом примере A начинается с частичной функции receive по умолчанию. Когда он получает сообщение, он отправляет другое сообщение в B, устанавливает тайм-аут приема для получения ответа от B и затем переключает его на функцию receive на то, что является специфическим для ожидания ответа от B. В этой новой функции приема, Я мог бы либо получить свой ответ от B вовремя, либо получить ReceiveTimeout, указав, что вовремя не получил ответа. В любом случае я отменяю свой тайм-аут приема, потому что он повторяется.

Теперь это очень упрощено, но я просто пытался показать один способ сделать обратное и вперед между двумя участниками, что кажется вашим примером.