Подтвердить что ты не робот

Правильный способ остановить потоки Akka при условии

Я успешно использовал FileIO для потоковой передачи содержимого файла, вычисления некоторых преобразований для каждой строки и агрегирования/уменьшения результатов,

Теперь у меня довольно конкретный вариант использования, где я хотел бы остановить поток, когда условие достигнуто, так что нет необходимости читать весь файл, но процесс завершается как можно скорее. Каков рекомендуемый способ достижения этого?

4b9b3361

Ответ 1

Если условие останова "снаружи потока"

Существует расширенный строительный блок под названием KillSwitch, который вы можете использовать для этого: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html Поток будет закрыт после уведомления об ошибке.

Он имеет такие методы, как abort(reason)/shutdown и т.д., см. здесь API: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

Справочная документация находится здесь: http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala

Пример использования:

val countingSrc = Source(Stream.from(1)).delay(1.second,
    DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]

val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both)
  .run()

doSomethingElse()

killSwitch.shutdown()

Await.result(last, 1.second) shouldBe 2

Если условие останова находится внутри потока

Вы можете использовать takeWhile, чтобы выразить какое-либо условие действительно, хотя иногда take или limit также может быть достаточно "принять 10 lnes".

Если ваша логика очень продвинута, вы можете создать специальный этап, который обрабатывает эту специальную логику с помощью statefulMapConcat, которая позволяет выразить буквально все, чтобы вы могли завершить поток, когда захотите "изнутри".