Подтвердить что ты не робот

Почему Spark терпит неудачу с java.lang.OutOfMemoryError: превышен ли верхний предел GC?

Я пытаюсь реализовать Hadoop Map/Reduce job, который отлично работал в Spark. Определение приложения Spark следующее:

val data = spark.textFile(file, 2).cache()
val result = data
  .map(//some pre-processing)
  .map(docWeightPar => (docWeightPar(0),docWeightPar(1))))
  .flatMap(line => MyFunctions.combine(line))
  .reduceByKey( _ + _)

Где MyFunctions.combine есть

def combine(tuples: Array[(String, String)]): IndexedSeq[(String,Double)] =
  for (i <- 0 to tuples.length - 2;
       j <- 1 to tuples.length - 1
  ) yield (toKey(tuples(i)._1,tuples(j)._1),tuples(i)._2.toDouble * tuples(j)._2.toDouble)

Функция combine создает множество ключей карты, если список, используемый для ввода, большой, и это то, где выбраны исключения.

В настройке сокращения Hadoop Map у меня не было проблем, потому что это та точка, в которой функция combine дает точку, где Hadoop записывает пары карт на диск. Кажется, что Spark сохраняет все в памяти, пока он не взорвется с помощью java.lang.OutOfMemoryError: GC overhead limit exceeded.

Я, наверное, делаю что-то действительно не так, но я не мог найти никаких указаний о том, как выйти из этого, я хотел бы знать, как я могу избежать этого. Поскольку я являюсь полным noob в Scala и Spark, я не уверен, что проблема одна или другая, или и то, и другое. В настоящее время я пытаюсь запустить эту программу на своем собственном ноутбуке, и она работает для входов, где длина массива tuples не очень длинная. Спасибо заранее.

4b9b3361

Ответ 1

Настройка памяти, вероятно, является хорошим способом, как уже было предложено, потому что это дорогостоящая операция, которая масштабируется уродливо. Но, возможно, некоторые изменения кода помогут.

В вашей комбинированной функции вы можете использовать другой подход, который избегает операторов if с помощью функции combinations. Я бы также преобразовал второй элемент кортежей в парные до операции:

tuples.

    // Convert to doubles only once
    map{ x=>
        (x._1, x._2.toDouble)
    }.

    // Take all pairwise combinations. Though this function
    // will not give self-pairs, which it looks like you might need
    combinations(2).

    // Your operation
    map{ x=>
        (toKey(x{0}._1, x{1}._1), x{0}._2*x{1}._2)
    }

Это даст итератор, который вы можете использовать downstream или, если хотите, преобразовать в список (или что-то) с помощью toList.

Ответ 2

Добавьте следующий JVM arg при запуске spark-shell или spark-submit:

-Dspark.executor.memory=6g

Вы также можете явно указать количество рабочих при создании экземпляра SparkContext:

Распределенный кластер

Задайте имена подчиненных в conf/slaves:

val sc = new SparkContext("master", "MyApp")

Ответ 3

В документации (http://spark.apache.org/docs/latest/running-on-yarn.html) вы можете прочитать, как настроить исполнителей и предел памяти. Например:

--master yarn-cluster --num-executors 10 --executor-cores 3 --executor-memory 4g --driver-memory 5g  --conf spark.yarn.executor.memoryOverhead=409

MemoryOverhead должен быть 10% от памяти исполнителя.

Изменить: Исправлено 4096 - 409 (ниже приведен комментарий ниже)

Ответ 4

Эта ошибка сборки мусора JVM произошла воспроизводимо в моем случае, когда я увеличил значение spark.memory.fraction до значений более 0,6. Поэтому лучше оставить значение по умолчанию, чтобы избежать ошибок сборки мусора JVM. Это также рекомендуется https://forums.databricks.com/info/2202/javalangoutofmemoryerror-gc-overhead-limit-exceede.html.

Для получения дополнительной информации, почему 0.6 является лучшим значением для spark.memory.fraction, см. https://issues.apache.org/jira/browse/SPARK-15796.