Подтвердить что ты не робот

Контрольные точки Spark streaming для DStreams

В Spark Streaming возможно (и обязательно, если вы собираетесь использовать операции с сохранением состояния), чтобы установить StreamingContext для выполнения контрольных точек в надежное хранилище данных (S3, HDFS,...) of (AND):

  • Метаданные
  • DStream lineage

Как описано здесь, чтобы установить хранилище выходных данных, вам нужно позвонить yourSparkStreamingCtx.checkpoint(datastoreURL)

С другой стороны, можно установить интервалы контрольной точки линии для каждого DataStream, просто позвонив checkpoint(timeInterval) в них. Фактически, рекомендуется установить интервал контрольной точки линии между 5 и 10 раз скользящим интервалом DataStream:

dstream.checkpoint(checkpointInterval). Как правило, контрольно-пропускной пункт интервал 5 - 10 интервалов скольжения DStream - хорошая настройка для попробуйте.

Мой вопрос:

Когда контекст потоковой передачи настроен на выполнение контрольной точки и no ds.checkpoint(interval) называется, включена ли контрольная точка линии для всех потоков данных со значением по умолчанию checkpointInterval, равным batchInterval? Или, наоборот, только метаданные проверяют, что включено?

4b9b3361

Ответ 1

Проверка кода искры (v1.5) Я обнаружил, что контрольная точка DStream s 'включена в двух случаях:

Явным вызовом метода checkpoint (не StreamContext):

/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
        throw new UnsupportedOperationException(
            "Cannot change checkpoint interval of an DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
}

При инициализации DStream, пока конкретный подкласс DStream переопределит атрибут mustCheckpoint (установив его на true):

 private[streaming] def initialize(time: Time) {
  ...
  ...   
   // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
   if (mustCheckpoint && checkpointDuration == null) {
     checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
     logInfo("Checkpoint interval automatically set to " + checkpointDuration)
   }
  ...

Первый случай очевиден. Выполнение наивного анализа кода Spark Streaming:

grep "val mustCheckpoint = true" $(find -type f -name "*.scala")

> ./org/apache/spark/streaming/api/python/PythonDStream.scala:  override     val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala:  override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala:  override val mustCheckpoint = true

Я могу обнаружить, что в общем случае (игнорирование PythonDStream) контрольная точка StreamingContext позволяет только контрольные точки на линии для экземпляров StateDStream и ReducedWindowedDStream. Эти экземпляры являются результатом преобразований (соответственно, AND):

  • updateStateByKey. То есть поток, предоставляющий состояние через несколько окон.
  • reduceByKeyAndWindow