Подтвердить что ты не робот

Приостановка актера в Акке

У меня есть актер в Акке, который будет обрабатывать сообщения для создания определенных объектов. Некоторые поля этих объектов вычисляются на основе состояния других объектов в базе данных в момент создания.

Я бы хотел избежать создания условия гонки, когда обработка актера идет быстрее, чем база данных может сохранять объекты. Это может привести к несогласованным данным, следующим образом:

  • Актер создает Foo и отправляет его другим участникам для дальнейшей обработки и сохранения
  • Актеру предлагается создать еще один Foo. Поскольку первый еще не сохранен, новый создается на основе старого содержимого БД, тем самым создавая неправильный Foo.

Теперь эта возможность довольно удалена, так как создание Foo будет запускаться вручную. Но по-прежнему возможно, что двойной щелчок может вызвать проблемы при высокой нагрузке. И кто знает, будет ли завтра Foo автоматически создано.

Следовательно, мне нужен какой-то способ заставить актера ждать и возобновить его работу только после подтверждения того, что Foo сохранено.

Есть ли способ поставить актера в состояние ожидания и сказать ему возобновить его операции через некоторое время?

В принципе, я хотел бы использовать почтовый ящик в качестве очереди сообщений и контролировать скорость обработки очереди.

4b9b3361

Ответ 1

Нет, вы не можете приостановить действие актера: актеры всегда вытаскивают сообщения из своего почтового ящика как можно быстрее. Это оставляет только возможность того, что входящие запросы будут спрятаны, для последующей обработки:

class A(db: ActorRef) extends Actor with Stash {
  def receive = {
    case Request =>
      doWork()
      db ! Persist
      context.setReceiveTimeout(5.seconds)
      context.become({
        case Request        => stash()
        case Persisted      => context.unbecome(); unstashAll()
        case ReceiveTimeout => throw new TimeoutException("not persisted")
      }, discardOld = false)
  }
}

Обратите внимание, что доставка сообщений не гарантируется (или база данных может быть недоступна), и поэтому рекомендуется время ожидания.

Основная проблема

Эта проблема проявляется главным образом в тех случаях, которые недостаточно хорошо выровнены между моделью актера и моделью домена: актер является единицей согласованности, но в вашем случае использования ваше согласованное изображение требует наличия актуального внешнего сущности (базы данных), чтобы актер поступал правильно. Я не могу рекомендовать решение, не зная больше о прецеденте, но попытайтесь переделать свою проблему с учетом этого.

Ответ 2

Оказывается, для этого требуется всего несколько строк. Это решение, с которым я столкнулся, что согласуется с предложением pagoda_5b:

class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
  import QueueingActor._

  def receive = {
    case message =>
      context.become({
        case Resume =>
          unstashAll()
          context.unbecome()
        case _ => stash()
      })
      nextActor ! message
  }
}

object QueueingActor {
  case class Resume()
}