Подтвердить что ты не робот

Spark Streaming: почему внутренние затраты на обработку настолько высоки, что они обрабатывают состояние пользователя в несколько МБ?

Основываясь на наших экспериментах, мы видим, что внутренние затраты на внутреннюю обработку Spark Streaming занимают значительное количество времени, когда государство становится более миллиона объектов. В результате возникает латентность, потому что нам нужно увеличить интервал между партиями, чтобы избежать нестабильного поведения (время обработки > пакетный интервал).

Это не имеет никакого отношения к специфике нашего приложения, так как оно может быть воспроизведено кодом ниже.

Каковы именно те внутренние затраты на обработку/инфраструктуру Spark, которые занимают много времени для обработки состояния пользователя? Существуют ли какие-либо опции для уменьшения времени обработки, кроме простого увеличения интервала между пакетами?

Мы планировали широко использовать состояние: не менее 100 МБ или около того на каждом из нескольких узлов, чтобы хранить все данные в памяти и выгружать их один раз в час.

Увеличение интервала между партиями помогает, но мы хотим, чтобы минимальный интервал между пакетами.

Причина, вероятно, заключается не в пространстве, занимаемом состоянием, а скорее в большом графе объектов, потому что, когда мы изменили список на большой массив примитивов, проблема исчезла.

Просто догадаться: он может иметь какое-то отношение к org.apache.spark.util.SizeEstimator, используемому внутренне Spark, потому что он время от времени появляется при профилировании.

введите описание изображения здесь

Вот простая демонстрация, чтобы воспроизвести изображение выше на современном iCore7:

  • менее 15 МБ состояния
  • нет потока во всех
  • самая быстрая возможная (фиктивная) функция updateStateByKey
  • пакетный интервал 1 секунда
  • контрольная точка (требуется Spark, должна иметь) на локальный диск
  • проверено как локально, так и на YARN

код:

package spark;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SlowSparkStreamingUpdateStateDemo {

    // Very simple state model
    static class State implements Serializable {
        final List<String> data;
        State(List<String> data) {
            this.data = data;
        }
    }

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                // Tried KryoSerializer, but it does not seem to help much
                //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .setMaster("local[*]")
                .setAppName(SlowSparkStreamingUpdateStateDemo.class.getName());

        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
        javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation)

        List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData();
        System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData));
        JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData);

        JavaPairDStream<String, State> stream = javaStreamingContext
                .textFileStream(".") // fake: effectively, no input at all
                .mapToPair(input -> (Tuple2<String, State>) null) //  fake to get JavaPairDStream
                .updateStateByKey(
                        (inputs, maybeState) -> maybeState, // simplest possible dummy function
                        new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()),
                        initialRdd); // set generated state

        stream.foreachRDD(rdd -> { // simplest possible action (required by Spark)
            System.out.println("Is empty: " + rdd.isEmpty());
            return null;
        });

        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();
    }

    private static List<Tuple2<String, State>> prepareInitialRddData() {
        // 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize'
        int stateCount = 1000;
        int dataListSize = 200;
        int elementDataSize = 10;
        List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount);
        for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) {
            List<String> stateData = new ArrayList<>(dataListSize);
            for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) {
                stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize));
            }
            initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData)));
        }
        return initialRddInput;
    }

}
4b9b3361

Ответ 1

Государственное управление было улучшено в искровом разряде 1.6.
см. SPARK-2629 Улучшенное управление состоянием для Spark Streaming;

И в подробной спецификации дизайна:
Улучшено управление состоянием в Spark Streaming

Один из недостатков производительности описан ниже:

Необходимость более оптимизированного управления состоянием, которое не сканирует каждый ключ Текущий updateStateByKey сканирует каждый ключ в каждом периодическом интервале, даже если для этого ключа нет данных. Хотя эта семантика полезна, это некоторые рабочие нагрузки, для большинства рабочих нагрузок требуется только "сканирование и обновление состояния, для которого есть новые данные. И только небольшой процент от всего состояния нужно коснуться для этого в каждом интервале пакетной обработки. The cogroup-based implementation of updateStateByKey is not designed for this; cogroup scans all the keys every time. In fact, this causes the batch processing times of updateStateByKey to increase with the number of keys in the state, even if the data rate stays fixed. введите описание изображения здесь