Подтвердить что ты не робот

Блокировка звонков в Акку Актеры

Как новичок, я пытаюсь понять, как работают актеры. И, из документации, я думаю, я понимаю, что актеры - это объекты, которые выполняются в режиме синхронизации, а также то, что выполнение актера может содержать вызовы метода блокировки/синхронизации, например. db запросы

Но я не понимаю, что если вы напишете актера, у которого есть блокирующие вызовы внутри (например, выполнение блокирующего запроса), это испортит весь пул потоков (в том смысле, что использование процессора будет вниз и т.д.), правильно? Я имею в виду, что, по моему мнению, JVM не может понять, может ли он переключить этот поток на кого-то другого, если/когда актер делает блокирующий вызов.

Итак, учитывая природу concurrency, не должно ли быть очевидно, что актеры не должны делать никаких блокирующих вызовов, когда-либо?

Если это так, каков рекомендуемый способ выполнения неблокирующего/асинхронного вызова, скажем, вызов веб-службы, который извлекает что-то и отправляет сообщение другому игроку, когда этот запрос будет завершен? Если мы просто используем что-то вроде актера:

будущая карта {response = > x! response.body}

Это правильный способ справиться с этим?

Поблагодарили бы, если вы сможете это разъяснить мне.

4b9b3361

Ответ 1

Это действительно зависит от прецедента. Если запросы не нужно сериализовать, вы можете выполнить запрос в будущем и отправить результаты обратно отправителю следующим образом:

import scala.concurrent.{ future, blocking}
import akka.pattern.pipe

val resFut = future {
  blocking {
    executeQuery()
  }
}

resFut pipeTo sender

Вы также можете создать специальный диспетчер исключительно для вызовов БД и использовать роутер для создания актера. Таким образом, вы также можете легко ограничить количество одновременных запросов БД.

Ответ 2

Действительно великое введение "Руководство Неофита к Scala Часть 14: Подход актера к Concurrency" http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the-actor-approach-to-concurrency.html.

Актер получает сообщение, завершает блокирование кода в будущем, в нем метод Future.onSuccess - отправляет результаты, используя другие асинхронные сообщения. Но будьте осторожны, что переменная отправителя может измениться, поэтому закройте ее (сделайте локальную ссылку в будущем объекте).

p.s.: Руководство Неофита к Scala - действительно отличная книга.

Обновлено: (добавлен пример кода)

У нас есть рабочий и менеджер. Менеджер устанавливает работу, которая должна быть выполнена, рабочие отчеты "получили" и запускают длительный процесс (сон 1000). Между тем системный пинг менеджер с сообщениями "живой" и менеджер пингует с ними работника. Когда работа выполнена - сотрудник уведомляет об этом менеджера.

Примечание: выполнение sleep 1000 выполняется в импортированном исполнителе пула потоков по умолчанию /global - вы можете получить головоломку потока. NB: val commander = отправитель необходим, чтобы "закрыть" ссылку на оригинального отправителя, вызывать, когда onSuccess будет выполнен - ​​текущий отправитель внутри актера может быть уже настроен на какой-либо другой "отправитель"...

Log:

01:35:12:632 Humming ...
01:35:12:633 manager: flush sent
01:35:12:633 worker: got command
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:660 worker: started
01:35:12:662 worker: alive
01:35:12:662 manager: resource allocated
01:35:12:662 worker: alive
01:35:12:662 worker: alive
01:35:13:661 worker: done
01:35:13:663 manager: work is done
01:35:17:633 Shutdown!

код:

import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import com.typesafe.config.ConfigFactory
import java.text.SimpleDateFormat
import java.util.Date
import scala.concurrent._
import ExecutionContext.Implicits.global

object Sample {

  private val fmt = new SimpleDateFormat("HH:mm:ss:SSS")

  def printWithTime(msg: String) = {
    println(fmt.format(new Date()) + " " + msg)
  }

  class WorkerActor extends Actor {
    protected def receive = {
      case "now" =>
        val commander = sender
        printWithTime("worker: got command")
        future {
          printWithTime("worker: started")
          Thread.sleep(1000)
          printWithTime("worker: done")
        }(ExecutionContext.Implicits.global) onSuccess {
          // here commander = original sender who requested the start of the future
          case _ => commander ! "done" 
        }
        commander ! "working"
      case "alive?" =>
        printWithTime("worker: alive")
    }
  }

  class ManagerActor(worker: ActorRef) extends Actor {
    protected def receive = {
      case "do" =>
        worker ! "now"
        printWithTime("manager: flush sent")
      case "working" =>
        printWithTime("manager: resource allocated")
      case "done" =>
        printWithTime("manager: work is done")
      case "alive?" =>
        printWithTime("manager alive")
        worker ! "alive?"
    }
  }

  def main(args: Array[String]) {

    val config = ConfigFactory.parseString("" +
      "akka.loglevel=DEBUG\n" +
      "akka.debug.lifecycle=on\n" +
      "akka.debug.receive=on\n" +
      "akka.debug.event-stream=on\n" +
      "akka.debug.unhandled=on\n" +
      ""
    )

    val system = ActorSystem("mine", config)
    val actor1 = system.actorOf(Props[WorkerActor], "worker")
    val actor2 = system.actorOf(Props(new ManagerActor(actor1)), "manager")

    actor2 ! "do"
    actor2 ! "alive?"
    actor2 ! "alive?"
    actor2 ! "alive?"

    printWithTime("Humming ...")
    Thread.sleep(5000)
    printWithTime("Shutdown!")
    system.shutdown()

  }
}

Ответ 3

Вы правы, думая о пуле потоков, если вы планируете блокировать звонки в Akka. Чем больше вы блокируете, тем больше будет пул потоков. Полностью неблокирующая система действительно нуждается в пуле потоков, равном количеству ядер процессора вашего компьютера. Эталонная конфигурация использует пул в 3 раза больше ядер ЦП на машине, чтобы обеспечить некоторую блокировку:

    # The core pool size factor is used to determine thread pool core size
    # using the following formula: ceil(available processors * factor).
    # Resulting size is then bounded by the core-pool-size-min and
    # core-pool-size-max values.
    core-pool-size-factor = 3.0

источник

Но вы можете увеличить akka.default-dispatcher.fork-join-executor.core-pool-size-factor до более высокого номера, если вы делаете больше блокировки, или сделайте диспетчера, кроме по умолчанию, специально для блокировки вызовов с более высоким fork-join-executor.core-pool-size-factor

WRT, что лучший способ сделать блокировку вызовов в Akka. Я бы рекомендовал масштабировать, создав несколько экземпляров актеров, которые блокируют вызовы и помещают router в них, чтобы сделать их похожими на одного участника к остальной части вашего приложения.