Подтвердить что ты не робот

Как работает I/O в Акке?

Как работает модель актера (в Akka), когда вам нужно выполнять операции ввода-вывода (т.е. операцию базы данных)?

Я понимаю, что операция блокировки вызовет исключение (и по существу разрушит все concurrency из-за обусловленной природой Netty, которую использует Akka). Следовательно, мне пришлось бы использовать Future или что-то подобное - однако я не понимаю модель concurrency.

  • Может ли 1 актер обрабатывать несколько сообщений одновременно?
  • Если актер делает блокирующий вызов в Future (т.е. future.get()), это блокирует только текущее исполнение актера; или он предотвратит выполнение для всех участников до завершения блокирующего вызова?
  • Если он блокирует все выполнение, как использовать будущую помощь concurrency (т.е. не будет вызывать блокировку вызовов в будущем, все еще это означает создание актера и выполнение блокирующего вызова)?
  • Каков наилучший способ работы с многоэтапным процессом (т.е. чтение из базы данных, вызов блокирующего веб-сервиса, чтение из базы данных, запись в базу данных), где каждый шаг зависит от последнего?

Основной контекст таков:

  • Я использую сервер Websocket, который будет поддерживать тысячи сеансов.
  • Каждый сеанс имеет некоторое состояние (например, данные аутентификации и т.д.);
  • Клиент Javascript отправит на сервер сообщение JSON-RPC, которое передаст его соответствующему игроку сеанса, который выполнит его и вернет результат.
  • Выполнение вызова RPC будет включать некоторые операции ввода-вывода и блокировки вызовов.
  • Будет много одновременных запросов (каждый пользователь будет делать значительное количество запросов по соединению WebSocket, и будет много пользователей).

Есть ли лучший способ достичь этого?

4b9b3361

Ответ 1

Операции блокировки не генерируют исключений в Akka. Вы можете блокировать звонки с Актера (который вы, вероятно, хотите свести к минимуму, но это другая история).

  • нет, 1 экземпляр субъекта не может.
  • Он не будет блокировать других актеров. Вы можете повлиять на это, используя специальный диспетчер. Фьючерсы используют диспетчера по умолчанию (глобальное событие, управляемое обычным образом), поэтому он запускается в потоке в пуле. Вы можете выбрать, какой диспетчер вы хотите использовать для своих актеров (для каждого актера или для всех). Я думаю, если бы вы действительно хотели создать проблему, вы могли бы передать точно тот же (потоковый) диспетчер для фьючерсов и актеров, но это с определенной долей намерения. Я думаю, если у вас есть огромное количество фьючерсов, блокирующих неограниченное время, а служба-исполнитель настроена на фиксированное количество потоков, вы можете взорвать службу-исполнитель. Так много "ifs". f.get блоки, только если будущее еще не завершено. Он будет блокировать "текущий поток" Актера, из которого вы его называете (если вы называете это от актера, что не обязательно кстати)
  • вам необязательно блокировать. вы можете использовать обратный вызов вместо f.get. Вы даже можете создавать фьючерсы без блокировки. поговорите с Виктором о "перспективном будущем akka" для более подробной информации: http://skillsmatter.com/podcast/scala/talk-by-viktor-klang
  • Я бы использовал асинхронную связь между этапами (если этапы представляют собой значимые процессы сами по себе), поэтому используйте актера для каждого шага, где каждый актер отправляет однонаправленное сообщение в следующее, возможно, также однопользовательское сообщение другому актеру которые не будут блокировать, которые могут контролировать процесс. Таким образом, вы могли бы создавать цепочки актеров, из которых вы могли бы сделать много, перед ним вы могли бы поставить актер балансировки нагрузки, так что если один актер блокируется в одной цепочке, другой из того же типа может не быть в другой цепочке. Это также будет работать для вашего "контекстного" вопроса, передачи рабочей нагрузки местным игрокам, связать их за актером балансировки нагрузки.

Что касается netty (и я предполагаю, что вы имеете в виду Remote Actors, потому что это единственная вещь, с которой netty используется в Akka), пропустите свою работу как можно скорее локальному актеру или будущему (с обратным вызовом), если вы беспокоитесь о сроках или не позволяете нетти делать это в некотором роде.

Ответ 2

Операции блокировки обычно не генерируют исключения, но ожидание в будущем (например, с помощью методов отправки !! или !!!) может вызывать исключение тайм-аута. Вот почему вы должны как можно больше придерживаться огня и забыть, использовать значимое значение тайм-аута и, по возможности, предпочитать обратные вызовы.

  • Аккаунт akka не может явно обрабатывать несколько сообщений в строке, но вы можете играть со значением throughput через файл конфигурации. Затем актер обработает несколько сообщений (т.е. Его метод приема будет вызываться несколько раз подряд), если его очередь сообщений не пуста: http://akka.io/docs/akka/1.1.3/scala/dispatchers.html#id5

  • Блокирующие операции внутри актера не будут "блокировать" всех участников, но если вы обмениваетесь потоками между участниками (рекомендуемое использование), один из потоков диспетчера будет заблокирован до возобновления операций. Поэтому старайтесь как можно больше составлять фьючерсы и остерегайтесь значения тайм-аута).

3 и 4. Я согласен с ответами Раймонда.

Ответ 3

Что сказал Раймонд и парадигматичный, но также, если вы хотите избежать голодания пула потоков, вы должны обернуть любые операции блокировки в scala.concurrent.blocking.

Конечно, лучше избегать операций блокировки, но иногда вам нужно использовать библиотеку, которая блокирует. Если вы завершите указанный код в blocking, он позволит контексту выполнения знать, что вы можете блокировать этот поток, чтобы он мог выделить другой, если это необходимо.

Проблема хуже парадигматической, поскольку, если у вас есть несколько операций блокировки, вы можете заблокировать все потоки в пуле потоков и не иметь свободных потоков. Вы можете оказаться в тупике, если все ваши потоки заблокированы тем, что не произойдет, пока не будет запланирован другой актер/будущее.

Вот пример:

import scala.concurrent.blocking
...

Future {
  val image = blocking { load_image_from_potentially_slow_media() }
  val enhanced = image.enhance()
  blocking {
    if (oracle.queryBetter(image, enhanced)) {
      write_new_image(enhanced)
    }
  }
  enhanced
}

Документация здесь.