Подтвердить что ты не робот

Контроль размера очередей цикла событий Netty

Мы реализовали мониторинг очередей циклов событий Netty, чтобы понять проблемы с некоторыми из наших модулей Netty. Монитор использует метод io.netty.util.concurrent.SingleThreadEventExecutor#pendingTasks, который работает для большинства модулей, но для модуля, который обрабатывает несколько тысяч HTTP-запросов в секунду, кажется, что он висел или очень медленный. Теперь я понимаю, что документы строго указывают, что это может быть проблемой, и я чувствую себя довольно хромой... поэтому я ищу другой способ реализовать этот монитор.

Здесь вы можете увидеть старый код: https://github.com/outbrain/ob1k/blob/6364187b30cab5b79d64835131d9168c754f3c09/ob1k-core/src/main/java/com/outbrain/ob1k/common/metrics/NettyQueuesGaugeBuilder.java

  public static void registerQueueGauges(final MetricFactory factory, final EventLoopGroup elg, final String componentName) {

    int index = 0;
    for (final EventExecutor eventExecutor : elg) {
      if (eventExecutor instanceof SingleThreadEventExecutor) {
        final SingleThreadEventExecutor singleExecutor = (SingleThreadEventExecutor) eventExecutor;
        factory.registerGauge("EventLoopGroup-" + componentName, "EventLoop-" + index, new Gauge<Integer>() {
          @Override
          public Integer getValue() {
            return singleExecutor.pendingTasks();
          }
        });

        index++;
      }
    }
  }

Мой вопрос, есть ли лучший способ контролировать размеры очереди?

Это может быть весьма полезной метрикой, поскольку ее можно использовать для понимания латентности, а также для использования в некоторых случаях обратного давления.

4b9b3361

Ответ 1

Вероятно, вам нужно будет отслеживать изменения как задачи, как добавленные и удаленные из экземпляров SingleThreadEventExecutor.

Для этого вы можете создать класс, который обертывает и/или расширяет SingleThreadEventExecutor. Тогда у вас будет java.util.concurrent.atomic.AtomicInteger, который вы вызываете incrementAndGet() каждый раз при добавлении новой задачи и decrementAndGet() каждый раз, когда вы удаляете/завершаете.

Тогда AtomicInteger предоставит вам текущее количество ожидающих задач. Вы могли бы, вероятно, переопределить pendingTasks(), чтобы использовать это значение вместо этого (хотя будьте осторожны - я не 100%, у которого не было бы побочных эффектов).

Это добавило бы немного накладных расходов для каждой выполняемой задачи, но сделало бы получение числа ожидающих задач около постоянной скорости.

Недостатком этого является, конечно, что он более инвазивен, чем то, что вы делаете в данный момент, так как вам нужно настроить приложение для использования разных исполнителей событий.

NB. это всего лишь предложение о том, как обойти эту проблему - я специально не делал этого с Netty. Хотя в прошлом я делал это с другим кодом.