Подтвердить что ты не робот

Scala.concurrent.blocking - что он на самом деле делает?

Я потратил некоторое время на изучение темы Scala контекстов выполнения, лежащих в основе моделей потоков и concurrency. Можете ли вы объяснить, каким образом scala.concurrent.blocking "настроить поведение во время выполнения" и "может повысить производительность или избежать взаимоблокировок", как описано в scaladoc?

В документации он представлен как средство ожидания api, которое не реализует Awaitable. (Может быть, также нужно просто обернуть вычисление только длинных вычислений?).

Что он на самом деле делает?

Следуя через источник, не легко предает свои секреты.

4b9b3361

Ответ 1

blocking должен выступать в качестве подсказки для ExecutionContext, что содержащийся код блокирует и может привести к голоданию нити. Это даст пулу потоков возможность создавать новые потоки, чтобы предотвратить голодание. Это то, что подразумевается под "настройкой поведения во время выполнения". Это не волшебство, хотя и не будет работать с каждым ExecutionContext.

Рассмотрим следующий пример:

import scala.concurrent._
val ec = scala.concurrent.ExecutionContext.Implicits.global

(0 to 100) foreach { n =>
    Future {
        println("starting Future: " + n)
        blocking { Thread.sleep(3000) }
        println("ending Future: " + n)
    }(ec)
}

Используется глобальный ExecutionContext по умолчанию. Запустив код как есть, вы заметите, что 100 Future выполняются немедленно, но если вы удалите blocking, они будут выполнять только несколько за раз. По умолчанию ExecutionContext будет реагировать на блокировку вызовов (помеченных как таковые), создавая новые потоки и, таким образом, не перегружается с запуском Future s.

Теперь рассмотрим этот пример с фиксированным пулом из 4 потоков:

import java.util.concurrent.Executors
val executorService = Executors.newFixedThreadPool(4)
val ec = ExecutionContext.fromExecutorService(executorService)

(0 to 100) foreach { n =>
    Future {
        println("starting Future: " + n)
        blocking { Thread.sleep(3000) }
        println("ending Future: " + n)
    }(ec)
}

Этот ExecutionContext не создан для обработки нереста новых потоков, поэтому даже с моим кодом блокировки, окруженным blocking, вы можете видеть, что он будет выполнять только не более 4 Future за раз. И поэтому мы говорим, что это "может улучшить производительность или избежать взаимоблокировок" - это не гарантируется. Как мы видим в последнем ExecutionContext, это не гарантируется вообще.

Как это работает? Как указано, blocking выполняет этот код:

BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)

BlockContext.current извлекает BlockContext из текущего потока, видя здесь. A BlockContext обычно просто a Thread с признаком BlockContext, смешанным в. Как видно из источника, он либо сохраняется в ThreadLocal, либо если он не найден там, это шаблон, сопоставленный с текущая нить. Если текущий поток не является BlockContext, вместо него используется DefaultBlockContext.

Далее, blockOn вызывается в текущем BlockContext. blockOn является абстрактным методом в BlockContext, поэтому его реализация зависит от того, как его обрабатывает ExecutionContext. Если мы рассмотрим реализацию для DefaultBlockContext (когда текущий поток не является BlockContext), мы видим, что blockOn на самом деле ничего не делает. Поэтому использование blocking в BlockContext означает, что ничего особенного не делается вообще, и код запускается как есть, без побочных эффектов.

Как насчет потоков BlockContext s? Например, в контексте global, который отображается здесь , blockOn выполняется совсем немного. Копая глубже, вы можете видеть, что он использует ForkJoinPool под капотом, а DefaultThreadFactory, определенный в том же фрагменте, который используется для нереста новых потоков в ForkJoinPool. Без реализации blockOn из BlockContext (потока), ForkJoinPool не знает, что вы блокируете, и не будет пытаться вызвать больше потоков в ответ.

Scala Await, использует blocking для его реализации.