Подтвердить что ты не робот

Правильный дизайн в акке. - Доставка сообщений

Я прочитал некоторые сообщения о том, как и почему akka не гарантирует доставку сообщений. документация, этот обсуждение, а другие обсуждения по группе делают объясните это хорошо.

Я новичок в akka и хочу знать подходящий дизайн для случая. Например, у меня есть 3 разных актера на разных машинах. Один отвечает за поваренные книги, а другой за историю и за последние годы за технологические книги.

У меня есть главный актер на другой машине. Предположим, есть запрос к основному актеру для поиска, если у нас есть доступная книга. Главный актер отправляет запросы трем отдаленным актерам и ожидает результата. Поэтому я делаю это:

  val scatter = system.actorOf(
        Props[SearchActor].withRouter(ScatterGatherFirstCompletedRouter(
              routees=someRoutees, within = 10 seconds)), "router")
  implicit val timeout = Timeout(10 seconds)
  val futureResult = scatter ?  Text("Concurrency in Practice")
  //      What should I do here?.
  //val result = Await.result(futureResult, timeout.duration) line(a)

Короче говоря, я отправил запросы всем 3 удаленным игрокам и ожидаю результата за 10 секунд.

Каким должен быть действие?

  • Скажем, я не получаю результат за 10 секунд, должен ли я отправить новый запрос всем им снова?
  • Что делать, если время within преждевременно. Но я не знаю заранее, сколько времени может потребоваться.
  • Что делать, если время within было достаточно, но сообщение было сброшено.

Если я не получаю ответ в within и снова отправлю запрос. Что-то вроде этого, оно остается асинхронным:

futureResult onComplete{
  case Success(i) => println("Result "+i)
  case Failure(e) => //send again
}

Но при слишком многих запросах не будет слишком много потоков на вызове и громоздких? Если я раскомментирую line(a), он станет синхронным, и под нагрузкой может произойти плохо.

Скажем, я не получаю ответ за 10 секунд. Если время within было преждевременным, то снова происходит тяжелое бесполезное вычисление. Если messsage упал, тогда 10 секунды полезного времени потрачены впустую. В случае, скажем, я знал, что сообщение доставлено, я, вероятно, буду ждать более продолжительной продолжительности, не скептически.

Как люди решают такие проблемы? ACK? Но тогда я должен сохранить состояние в акторе всех запросов. Это должно быть обычным делом, и я ищу правильный дизайн.

4b9b3361

Ответ 1

Я попытаюсь ответить на некоторые из этих вопросов для вас. У меня не будет конкретных ответов на все, но, надеюсь, я смогу вести вас в правильном направлении.

Для начала вам нужно будет внести изменения в то, как вы сообщаете запрос трем актерам, которые выполняют поиск книг. Использование ScatterGatherFirstCompletedRouter, вероятно, неверно. Этот маршрутизатор будет ждать ответа только от одного из маршрутов (первого для ответа), поэтому ваш набор результатов будет неполным, так как он не будет содержать результаты других двух маршрутов. Существует также BroadcastRouter, но это не будет соответствовать вашим потребностям, поскольку оно обрабатывает только tell (!), а не ask (?). Чтобы сделать то, что вы хотите сделать, один из вариантов - отправить запрос каждому получателю, получив Futures для ответов, а затем объединить их в агрегат Future с помощью Future.sequence. Упрощенный пример может выглядеть так:

case class SearchBooks(title:String)
case class Book(id:Long, title:String)

class BookSearcher extends Actor{

  def receive = {
    case req:SearchBooks =>
      val routees:List[ActorRef] = ...//Lookup routees here
      implicit val timeout = Timeout(10 seconds)
      implicit val ec = context.system.dispatcher

      val futures = routees.map(routee => (routee ? req).mapTo[List[Book]])
      val fut = Future.sequence(futures)

      val caller = sender //Important to not close over sender
      fut onComplete{
        case Success(books) => caller ! books.flatten

        case Failure(ex) => caller ! Status.Failure(ex)
      }
  }
}

Теперь, когда это не будет нашим окончательным кодом, но это приблизительная оценка того, что пытался сделать ваш образец. В этом примере, если какой-либо из нисходящих маршрутов терпит неудачу/время, мы попадаем в наш блок Failure, и вызывающий может также получить сбой. Если все они удастся, вызывающий получит вместо него только список объектов Book.

Теперь на ваши вопросы. Во-первых, вы спрашиваете, следует ли отправлять запрос всем участникам снова, если вы не получите ответ от одного из маршрутов в течение таймаута. Ответ на этот вопрос действительно зависит от вас. Разрешите ли вы на другом конце вашего пользователя увидеть частичный результат (т.е. Результаты от 2 из 3 участников) или всегда должен быть полный набор результатов каждый раз? Если да, вы можете настроить код, который отправляется на маршруты, чтобы выглядеть так:

val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{
  case ex =>
    //probably log something here
    List()
})

С помощью этого кода, если какой-либо из расписаний маршрутов или сбой по какой-либо причине, пустой список "Книга" будет заменен на ответ вместо отказа. Теперь, если вы не можете жить с частичными результатами, вы можете повторно отправить весь запрос еще раз, но вы должны помнить, что, вероятно, кто-то на другом конце ждет результатов своей книги, и они не хотят ждать вечно.

Для вашего второго вопроса вы спрашиваете, что, если ваш тайм-аут преждевременен? Выбранное вами значение тайм-аута будет полностью зависеть от вас, но оно, скорее всего, должно основываться на двух факторах. Первый фактор будет получен от проверки времени вызова поисковых запросов. Узнайте в среднем, сколько времени потребуется, и выберите значение, основанное на этом, с небольшой подушкой, чтобы быть в безопасности. Второй фактор заключается в том, как долго кто-то на другом конце готов ждать результатов. Вы можете быть очень консервативны в своем тайм-ауте, заставляя его как 60 секунд просто быть в безопасности, но если на самом деле кто-то на другом конце ждет результатов, как долго они готовы ждать? Я предпочел бы получить ответ об отказе, указывающий, что я должен попробовать снова, а не ждать навсегда. Поэтому, принимая во внимание эти два фактора, вы должны выбрать значение, которое позволит вам получать ответы на очень высокий процент времени, пока еще не делает вызывающего абонента на другом конце слишком долго.

В вопросе 3 вы спрашиваете, что произойдет, если сообщение упадет. В этом случае я предполагаю, что будущее для того, кто будет получать это сообщение, будет просто тайм-аутом, потому что он не получит ответа, потому что участник-получатель никогда не получит сообщение, на которое нужно ответить. Акка не JMS; он не имеет режимов подтверждения, когда сообщение может быть повторно отправлено несколько раз, если получатель не получает и не подтверждает сообщение.

Кроме того, как вы можете видеть из моего примера, я согласен с тем, что не блокирую агрегат Future с помощью Await. Я предпочитаю использовать неблокирующие обратные вызовы. Блокировка в функции приема не идеальна, поскольку экземпляр Actor перестает обрабатывать свой почтовый ящик до тех пор, пока эта операция блокировки не завершится. Используя неблокирующий обратный вызов, вы освобождаете этот экземпляр, чтобы вернуться к обработке своего почтового ящика, и разрешить обработку результата просто очередным заданием, которое выполняется в ExecutionContext, отделенным от лица, обрабатывающего его почтовый ящик.

Теперь, если вы действительно хотите не тратить средства связи, когда сеть ненадежна, вы можете посмотреть Надежный проксидоступно в Akka 2.2. Если вы не хотите идти по этому маршруту, вы можете опрокинуть его самостоятельно, периодически отправляя сообщения типа ping на маршруты. Если вы не ответите вовремя, вы помечаете его как "вниз" и не отправляете ему сообщения, пока не получите надежный (за очень короткий промежуток времени) ping от него, вроде как FSM для каждого маршрута. Любой из них может работать, если вам абсолютно необходимо это поведение, но вы должны помнить, что эти решения добавляют сложность и должны использоваться только в том случае, если вам абсолютно необходимо это поведение. Если вы разрабатываете банковское программное обеспечение, и вы абсолютно нуждаетесь в гарантированной семантике поставки, так как плохие финансовые последствия приведут к другому, непременно пойдут с таким подходом. Просто будьте осторожны при принятии решения, нужно ли вам что-то вроде этого, потому что я ставлю в 90% случаев, когда вы этого не делаете. В вашей модели единственный человек, который, вероятно, пострадал от ожидания чего-то, что вы, возможно, уже знали, не будет успешным, - это вызывающий абонент на другом конце. Используя неблокирующие обратные вызовы в актере, он не останавливается на том, что что-то может занять много времени; он уже перешел к следующему сообщению. Вы также должны быть осторожны, если решите повторно подать заявку на отказ. Вы не хотите наводнять почтовые ящики получателей. Если вы решите повторно отправить его, ограничьте его фиксированным числом раз.

Еще один возможный подход, если вам нужен этот гарантированный вид семантики, может заключаться в том, чтобы изучить модель Akka Clustering Model. Если вы сгруппировали маршруты вниз по течению и один из серверов потерпел неудачу, весь трафик будет перенаправлен на node, который все еще находился до тех пор, пока другой node не восстановится.