Подтвердить что ты не робот

Spark-кластер выходит из строя при больших входах, хорошо работает для небольших

Я играю с Спарком. Это стандартное, предварительно построенное распределение (0.7.0) с веб-сайта, с конфигурацией по умолчанию, режимом кластера, одним рабочим (мой localhost). Я прочитал документы об установке, и все кажется прекрасным.

У меня есть файл CSV (различные размеры, 1000 - 1 миллион строк). Если я запустил приложение с небольшим входным файлом (например, 1000 строк), все в порядке, программа выполняется за считанные секунды и выдает ожидаемый результат. Но когда я поставляю больший файл (100 000 строк или 1 миллион), выполнение не выполняется. Я пытался копаться в журналах, но не очень помог (он повторяет весь процесс примерно 9-10 раз, а exitst с неудачей после этого. Также есть некоторая ошибка, связанная с извлечением из некоторого нулевого источника).

Результат Iterable, возвращаемый первым JavaRDD, является подозрительным для меня. Если я верну жесткокодированный одноэлементный список (например, res.add( "something" ), return res;), все в порядке, даже с миллионом строк. Но если я добавлю все свои ключи, я хочу (28 строк длиной 6-20 символов), процесс завершится неудачей только с большим вводом. Проблема в том, что мне нужны все эти ключи, это настоящая бизнес-логика.

Я использую Linux amd64, четырехъядерный ядро, 8 ГБ оперативной памяти. Последняя версия Oracle Java7 JDK. Конфигурация Spark:

SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar

Я должен отметить, что когда я запускаю программу, он говорит:

13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address

Вот моя программа. Он основан на примере JavaWordCount, минимально измененном.

public final class JavaWordCount
{
    public static void main(final String[] args) throws Exception
    {
        final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
            System.getenv("SPARK_HOME"), new String[] {"....jar" });

        final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterable<String> call(final String s)
            {
                // parsing "s" as the line, computation, building res (it a List<String>)
                return res;
            }
        });

        final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(final String s)
            {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(final Integer i1, final Integer i2)
            {
                return i1 + i2;
            }
        });

        counts.collect();

        for (Tuple2<?, ?> tuple : counts.collect()) {
            System.out.println(tuple._1 + ": " + tuple._2);
        }
    }
}
4b9b3361

Ответ 1

Мне удалось исправить это, установив true для свойства spark.mesos.coarse значение true. Подробнее здесь.

Обновление. Я пару раз занимаюсь Spark. Эти настройки немного помогли мне, но почти невозможно обработать ~ 10 миллионов строк текста на одной машине.

System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects
System.setProperty("spark.mesos.coarse", "true"); // link provided
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load

Примечание. Увеличение размера кадра кажется особенно полезным при предотвращении: org.apache.spark.SparkException: Error communicating with MapOutputTracker