Подтвердить что ты не робот

HTTP-запрос Throttle на Akka/Spray

Я использую актеров Akka в Scala для загрузки ресурсов из внешней службы (запрос HTTP get). Ответ от внешней службы - JSON, и я должен использовать пейджинг (провайдер очень медленный). Я хочу загрузить все вычисленные результаты одновременно в 10 потоках. Я использую такой URL, чтобы загрузить кусок: http://service.com/itmes?limit=50&offset=1000

Я создал следующий конвейер:

ScatterActor => RoundRobinPool[10](LoadChunkActor) => Aggreator

ScatterActor берет общее количество элементов для загрузки и делит его на куски. Я создал 10 LoadChunkActor для одновременной обработки задач.

  override def receive: Receive = {
    case LoadMessage(limit) =>
    val offsets: IndexedSeq[Int] = 0 until limit by chunkSize
    offsets.foreach(offset => context.system.actorSelection(pipe) !
    LoadMessage(chunkSize, offset))
 }

LoadChunkActor использует Spray для отправки запроса. Актер выглядит следующим образом:

val pipeline = sendReceive ~> unmarshal[List[Items]]
override def receive: Receive = {
  case LoadMessage(limit, offset) =>
    val uri: String = s"http://service.com/items?limit=50&offset=$offset"
    val responseFuture = pipeline {Get(uri)}
    responseFuture onComplete {
      case Success(items) => aggregator ! Loaded(items)
    }
 }

Как вы можете видеть, LoadChunkActor запрашивает кусок из внешней службы и добавляет обратный вызов для запуска onComplete. Актер теперь готов принять еще одно сообщение, и он просит еще один кусок. Spray использует неблокирующий API для загрузки кусков. В результате внешняя служба заливается моими запросами, и я получаю таймауты.

Как мне планировать список задач, но я хочу обрабатывать максимум 10 одновременно?

4b9b3361

Ответ 1

Я создал следующее решение (похожее на pull http://www.michaelpollmeier.com/akka-work-pulling-pattern/:

ScatterActor (10000x messages) => 
  ThrottleActor => LoadChunkActor => ThrottleMonitorActor => Aggregator
         ^                                    |
         |<--------WorkDoneMessage------------|
  • ThrottleActor публикует сообщения в ListBuffer и отправляет на LoadChunkActor максимальное количество сообщений.
  • Когда LoadChunkActor отправляет сообщение в агрегатор через ThrottleMonitorActor.
  • ThrottleMonitorActor отправляет подтверждение ThrottleActor.
  • ThrottleActor отправляет следующее сообщение LoadChunkActor.