Основываясь на наших экспериментах, мы видим, что внутренние затраты на внутреннюю обработку Spark Streaming занимают значительное количество времени, когда государство становится более миллиона объектов. В результате возникает латентность, потому что нам нужно увеличить интервал между партиями, чтобы избежать нестабильного поведения (время обработки > пакетный интервал).
Это не имеет никакого отношения к специфике нашего приложения, так как оно может быть воспроизведено кодом ниже.
Каковы именно те внутренние затраты на обработку/инфраструктуру Spark, которые занимают много времени для обработки состояния пользователя? Существуют ли какие-либо опции для уменьшения времени обработки, кроме простого увеличения интервала между пакетами?
Мы планировали широко использовать состояние: не менее 100 МБ или около того на каждом из нескольких узлов, чтобы хранить все данные в памяти и выгружать их один раз в час.
Увеличение интервала между партиями помогает, но мы хотим, чтобы минимальный интервал между пакетами.
Причина, вероятно, заключается не в пространстве, занимаемом состоянием, а скорее в большом графе объектов, потому что, когда мы изменили список на большой массив примитивов, проблема исчезла.
Просто догадаться: он может иметь какое-то отношение к org.apache.spark.util.SizeEstimator
, используемому внутренне Spark, потому что он время от времени появляется при профилировании.
Вот простая демонстрация, чтобы воспроизвести изображение выше на современном iCore7:
- менее 15 МБ состояния
- нет потока во всех
- самая быстрая возможная (фиктивная) функция updateStateByKey
- пакетный интервал 1 секунда
- контрольная точка (требуется Spark, должна иметь) на локальный диск
- проверено как локально, так и на YARN
код:
package spark;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SlowSparkStreamingUpdateStateDemo {
// Very simple state model
static class State implements Serializable {
final List<String> data;
State(List<String> data) {
this.data = data;
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf()
// Tried KryoSerializer, but it does not seem to help much
//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[*]")
.setAppName(SlowSparkStreamingUpdateStateDemo.class.getName());
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation)
List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData();
System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData));
JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData);
JavaPairDStream<String, State> stream = javaStreamingContext
.textFileStream(".") // fake: effectively, no input at all
.mapToPair(input -> (Tuple2<String, State>) null) // fake to get JavaPairDStream
.updateStateByKey(
(inputs, maybeState) -> maybeState, // simplest possible dummy function
new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()),
initialRdd); // set generated state
stream.foreachRDD(rdd -> { // simplest possible action (required by Spark)
System.out.println("Is empty: " + rdd.isEmpty());
return null;
});
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
private static List<Tuple2<String, State>> prepareInitialRddData() {
// 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize'
int stateCount = 1000;
int dataListSize = 200;
int elementDataSize = 10;
List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount);
for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) {
List<String> stateData = new ArrayList<>(dataListSize);
for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) {
stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize));
}
initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData)));
}
return initialRddInput;
}
}