Подтвердить что ты не робот

EsRejectedExecutionException в elasticsearch для параллельного поиска

Я запрашиваю elasticsearch для нескольких параллельных запросов, используя один экземпляр транспортного клиента в моем приложении.

Я получил следующее исключение для параллельного выполнения. Как решить проблему.

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on [email protected]5f804c60
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:509)
    at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteScan(SearchServiceTransportAction.java:441)
    at org.elasticsearch.action.search.type.TransportSearchScanAction$AsyncAction.sendExecuteFirstPhase(TransportSearchScanAction.java:68)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:171)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:153)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:52)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:42)
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
    at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:107)
    at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:43)
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
    at org.elasticsearch.action.search.TransportSearchAction$TransportHandler.messageReceived(TransportSearchAction.java:124)
    at org.elasticsearch.action.search.TransportSearchAction$TransportHandler.messageReceived(TransportSearchAction.java:113)
    at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:212)
    at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:109)
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
4b9b3361

Ответ 1

В Elasticsearch есть пул потоков и очередь для поиска по node. В пуле потоков будет N количество рабочих, готовых передать запросы. Когда приходит запрос и если работник свободен, это обрабатывается работником. Теперь по умолчанию количество рабочих равно числу ядер на этом CPU. Когда рабочие заполнены и больше запросов на поиск, запрос будет отправлен в очередь. Размер очереди также ограничен. Если по умолчанию размер составляет, скажем, 100, и если происходит больше параллельных запросов, чем это, тогда эти запросы будут отклонены, как вы можете видеть в журнале ошибок.

Решения:

  • Ближайшим решением для этого было бы увеличить размер очередь поиска. Мы также можем увеличить размер threadpool, но тогда это может сильно повлиять на производительность отдельных запросы. Таким образом, увеличение очереди может быть хорошей идеей. Но потом помните, что эта очередь - это жилая память и слишком большой размер очереди может привести к проблемам с памятью. (больше Информация)

  • Увеличьте количество узлов и реплик. Помните, что каждый node имеет свои собственный поиск threadpool/queue. Кроме того, поиск может происходить на первичном осколок или реплики.

Ответ 2

Возможно, это звучит странно, но вам нужно уменьшить количество параллельных поисков. За этим исключением Elasticsearch сообщает, что вы его перегружаете. В Elasticsearch установлены некоторые ограничения (на уровне подсчета потоков), и в большинстве случаев значения по умолчанию для этих пределов являются наилучшим вариантом. Итак, если вы тестируете свой кластер, чтобы увидеть, какую нагрузку он может выдержать, это будет показателем того, что некоторые пределы были достигнуты.

В качестве альтернативы, если вы действительно хотите изменить значение по умолчанию, вы можете попытаться увеличить размер очереди для поиска, чтобы удовлетворить требования параллелизма, но имейте в виду, что чем больше размер очереди, тем большее давление вы оказываете на кластер, что, в конце концов, вызовет нестабильность.

Ответ 3

Я видел эту же ошибку, потому что я отправлял много запросов на индексирование в ES параллельно. Поскольку я пишу миграцию данных, было достаточно легко сделать их серийными, и это решило проблему.

Ответ 4

Я не знаю, какова была конфигурация вашего узла, но размер вашей очереди (1000) уже находится на более высокой стороне. Как уже объяснили другие, ваши поисковые запросы ставятся в очередь в очереди пула потоков Elasticsearch. Даже после такого большого размера очереди, если вы получаете отклонения, это дает некоторый намек на то, что вам нужно пересмотреть шаблон запроса.

Как и во многих других разработках, даже в этом случае не существует универсального решения. Я обнаружил, что это очень хороший пост о том, как работает эта очередь, и о различных способах проведения теста производительности, чтобы выяснить, что лучше всего подходит для вашего варианта использования.

НТН!