Я пытаюсь реализовать Hadoop Map/Reduce job, который отлично работал в Spark. Определение приложения Spark следующее:
val data = spark.textFile(file, 2).cache()
val result = data
.map(//some pre-processing)
.map(docWeightPar => (docWeightPar(0),docWeightPar(1))))
.flatMap(line => MyFunctions.combine(line))
.reduceByKey( _ + _)
Где MyFunctions.combine
есть
def combine(tuples: Array[(String, String)]): IndexedSeq[(String,Double)] =
for (i <- 0 to tuples.length - 2;
j <- 1 to tuples.length - 1
) yield (toKey(tuples(i)._1,tuples(j)._1),tuples(i)._2.toDouble * tuples(j)._2.toDouble)
Функция combine
создает множество ключей карты, если список, используемый для ввода, большой, и это то, где выбраны исключения.
В настройке сокращения Hadoop Map у меня не было проблем, потому что это та точка, в которой функция combine
дает точку, где Hadoop записывает пары карт на диск. Кажется, что Spark сохраняет все в памяти, пока он не взорвется с помощью java.lang.OutOfMemoryError: GC overhead limit exceeded
.
Я, наверное, делаю что-то действительно не так, но я не мог найти никаких указаний о том, как выйти из этого, я хотел бы знать, как я могу избежать этого. Поскольку я являюсь полным noob в Scala и Spark, я не уверен, что проблема одна или другая, или и то, и другое. В настоящее время я пытаюсь запустить эту программу на своем собственном ноутбуке, и она работает для входов, где длина массива tuples
не очень длинная. Спасибо заранее.