Подтвердить что ты не робот

Akka HTTP: блокирование в будущем блокирует сервер

Я пытаюсь использовать Akka HTTP для базового аутентификации моего запроса. Так получилось, что у меня есть внешний ресурс для аутентификации, поэтому я должен сделать отдых для этого ресурса.

Это занимает некоторое время, и пока он обрабатывается, кажется, что остальная часть моего API заблокирована, ожидая этого вызова. Я воспроизвел это с помощью очень простого примера:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

Если я отправлю сообщение в конечную точку журнала, моя конечная точка доступа также застряла в ожидании 5 секунд, которые продиктовала конечная точка журнала.

Является ли это ожидаемым поведением, а если есть, как мне сделать блокирующие операции без блокировки всего моего API?

4b9b3361

Ответ 1

То, что вы наблюдаете, - это ожидаемое поведение, но, конечно, это очень плохо. Хорошо, что существуют известные решения и лучшие практики для защиты от этого. В этом ответе я хотел бы потратить некоторое время, чтобы объяснить проблему коротко, долго, а затем в глубине - наслаждаться чтением!

Короткий ответ: "не блокируйте инфраструктуру маршрутизации!", всегда используйте выделенный диспетчер для операций блокировки!

Причина наблюдаемого симптома: Проблема заключается в том, что вы используете context.dispatcher в качестве диспетчера, который выполняет блокирующие фьючерсы. Тот же диспетчер (который в простом смысле просто "пучок потоков" ) используется инфраструктурой маршрутизации для фактического обработки входящих запросов, поэтому, если вы блокируете все доступные потоки, вы в конечном итоге голодаете инфраструктуру маршрутизации. (Вопрос для обсуждения и бенчмаркинга - если Akka HTTP может защитить от этого, я добавлю это в мой список todo-to-to-list).

Блокирование должно обрабатываться с особой осторожностью, чтобы не воздействовать на других пользователей одного и того же диспетчера (поэтому мы делаем так просто отделить исполнение от разных), как объясняется в разделе "Акка": Блокировка требует тщательного управления.

Что-то еще, что я хотел обратить внимание здесь, заключается в том, что следует избегать блокировки API-интерфейсов вообще, если это возможно, - если ваша длительная операция - это не одна операция, а серия из них, вы могли бы разделить их на разных участников или секвентированные фьючерсы. Во всяком случае, просто хотелось указать - если это возможно, избегайте таких блокирующих вызовов, но если вам нужно - тогда объясняется, как правильно справляться с ними.

Углубленный анализ и решения:

Теперь, когда мы знаем, что не так, концептуально, давайте посмотрим, что именно сломано в приведенном выше коде, и как выглядит правильное решение этой проблемы:

Цвет = состояние потока:

  • бирюзовый - Соня
  • orange - ОЖИДАНИЕ
  • зеленый - RUNNABLE

Теперь давайте рассмотрим 3 части кода и то, как они влияют на диспетчеров и производительность приложения. Чтобы заставить это поведение, приложение было загружено следующим образом:

  • [a] продолжать запрашивать запросы GET (см. выше код в исходном вопросе для этого), он не блокирует там
  • [b], а затем после пожара 2000 запросов POST, которые вызовут блокировку 5 секунд после возвращения в будущее

1) [bad] Поведение диспетчера при неправильном коде:

// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses defaultDispatcher
      Thread.sleep(5000)                    // will block on the default dispatcher,
      System.currentTimeMillis().toString   // starving the routing infra
    }
  }
}

Итак, мы выставляем наше приложение на загрузку [a], и вы можете увидеть уже несколько потоков akka.actor.default-dispatcher - они обрабатывают запросы - небольшой зеленый фрагмент, а оранжевый - другие, на самом деле свободные есть.

блокировка убивает диспетчера по умолчанию

Затем мы запускаем загрузку [b], которая вызывает блокировку этих потоков - вы можете увидеть, что предыдущий поток "default-dispatcher-2,3,4" блокируется после простоя. Мы также наблюдаем, что пул растет - новые потоки запускаются "default-dispatcher-18,19,20,21...", однако они сразу же спят (!) - мы тратим драгоценный ресурс здесь!

Число таких запущенных потоков зависит от конфигурации диспетчера по умолчанию, но, вероятно, не превысит 50 или около того. Поскольку мы просто запускали блокировку с блокировкой 2k, мы голодали на весь поток - блокирующие операции доминировали таким образом, что для маршрутизации infra нет потока, доступного для обработки других запросов - очень плохо!

Сделайте что-нибудь об этом (что лучше всего подходит Akka btw - всегда изолируйте поведение блокировки, как показано ниже):

2) [good!] Поведение диспетчера хорошего структурированного кода/диспетчеров:

В вашем application.conf настройте этот диспетчер, предназначенный для блокировки поведения:

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // in Akka previous to 2.4.2:
    core-pool-size-min = 16
    core-pool-size-max = 16
    max-pool-size-min = 16
    max-pool-size-max = 16
    // or in Akka 2.4.2+
    fixed-pool-size = 16
  }
  throughput = 100
}

Вы должны прочитать больше в документации Akka Dispatchers, чтобы понять различные варианты здесь. Главное, однако, что мы выбрали ThreadPoolExecutor, у которого есть жесткий предел потоков, который он сохраняет для блокировки ops. Настройки размера зависят от того, что делает ваше приложение, и от количества ядер вашего сервера.

Затем нам нужно использовать его вместо стандартного:

// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")

val routes: Route = post { 
  complete {
    Future { // uses the good "blocking dispatcher" that we configured, 
             // instead of the default dispatcher – the blocking is isolated.
      Thread.sleep(5000)
      System.currentTimeMillis().toString
    }
  }
}

Мы нажимаем приложение с использованием той же нагрузки, сначала немного обычных запросов, а затем добавляем блокирующие. Вот как будет выглядеть ThreadPools в этом случае:

блокирующий пул масштабируется в соответствии с нашими потребностями

Итак, изначально обычные запросы легко обрабатываются диспетчером по умолчанию, вы можете увидеть несколько зеленых линий там - это фактическое выполнение (я на самом деле не помещаю сервер в большую нагрузку, поэтому он в основном простаивает).

Теперь, когда мы начинаем выдавать блокирующие операционные системы, my-blocking-dispatcher-* запускается и запускается до количества настроенных потоков. Он обрабатывает все Спящие там. Кроме того, после определенного периода ничего не происходит на этих потоках, он закрывает их. Если бы мы поразили сервер другой путкой блокировки, пул запустил бы новые потоки, которые будут заботиться о сне(), но в то же время - мы не тратим наши драгоценные потоки на "просто остаемся там и ничего не делать".

При использовании этой настройки пропускная способность обычных запросов GET не была затронута, они все еще были счастливы обслуживаться диспетчером по умолчанию (по-прежнему довольно свободным).

Это рекомендуемый способ борьбы с любым блокированием в реактивных приложениях. Он часто упоминается как "переборка" (или "изоляция" ) с плохими элементами приложения, в этом случае плохое поведение спя или блокирует.

3) [workaround-ish] поведение диспетчера при правильном применении blocking:

В этом примере мы используем метод scaladoc для scala.concurrent.blocking, который может помочь при столкновении с блокировками. Обычно это приводит к увеличению количества потоков, чтобы пережить блокирующие операции.

// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses the default dispatcher (it a Fork-Join Pool)
      blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                 // but at the cost of exploding the number of threads (which eventually
                 // may also lead to starvation problems, but on a different layer)
        Thread.sleep(5000)
        System.currentTimeMillis().toString
       }
    }
  }
}

Приложение будет вести себя следующим образом:

блокировка вызывает больше потоков для запуска

Вы заметите, что создано много новых потоков, потому что блокировка намекает на "о, это будет блокировка, поэтому нам нужно больше потоков". Это приводит к тому, что общее время, которое мы заблокировали, будет меньше, чем в примере 1), однако потом у нас есть сотни нитей, которые ничего не делают после завершения блокирующих операций... Конечно, они в конечном итоге будут отключены (FJP делает это), но какое-то время у нас будет большое (неконтролируемое) количество потоков, в отличие от 2) решения, где мы точно знаем, сколько потоков мы посвящаем для поведения блокировки.

Подведение итогов: никогда не блокируйте диспетчера по умолчанию:-)

Лучше всего использовать шаблон, показанный в 2), чтобы иметь диспетчер доступных операций блокировки и выполнять их там.

Надеюсь, это поможет, счастливый hakking!

Обсуждаемая версия HTTP Akka: 2.0.1

Используемый профайлер:. Многие люди спрашивали меня в ответ на этот ответ в частном порядке, какой профайлер я использовал для визуализации состояний Thread в приведенных выше фотографиях, поэтому добавив эту информацию здесь: я использовал YourKit, который является удивительным коммерческим профилировщиком (бесплатно для OSS), хотя вы можете достичь тех же результатов, используя бесплатный VisualVM из OpenJDK.

Ответ 2

Странно, но для меня все работает нормально (без блокировки). Вот код:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer

import scala.concurrent.Future


object Main {

  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val routes: Route = (post & entity(as[String])) { e =>
    complete {
      Future {
        Thread.sleep(5000)
        e
      }
    }
  } ~
    (get & path(Segment)) { r =>
      complete {
        "get"
      }
    }

  def main(args: Array[String]) {

    Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure {
      case e =>
        system.shutdown()
    }
  }
}

Также вы можете обернуть асинхронный код в директиву onComplete или onSuccess:

onComplete(Future{Thread.sleep(5000)}){e} 

onSuccess(Future{Thread.sleep(5000)}){complete(e)}