Подтвердить что ты не робот

Spark RDD - как они работают

У меня небольшая программа Scala, которая отлично работает на одном node. Однако я масштабирую его, так что он работает на нескольких узлах. Это моя первая такая попытка. Я просто пытаюсь понять, как RDD работают в Spark, поэтому этот вопрос основан на теории и может быть не на 100% правильным.

Скажем, я создаю RDD: val rdd = sc.textFile(file)

Теперь, когда я это сделал, значит ли это, что файл в file теперь разделен по узлам (если все узлы имеют доступ к пути к файлу)?

Во-вторых, я хочу подсчитать количество объектов в RDD (достаточно просто), однако мне нужно использовать это число в вычислении, которое должно быть применено к объектам в RDD - пример псевдокода:

rdd.map(x => x / rdd.size)

Скажем, есть 100 объектов в rdd и говорят, что есть 10 узлов, поэтому количество 10 объектов на node (при условии, что это так работает концепция RDD), теперь, когда я вызываю метод, каждый node будет выполнять расчет с rdd.size как 10 или 100? Поскольку, в общем случае, размер RDD 100, но локально на каждом node равен всего 10. Должен ли я сделать переменную широковещания до выполнения расчета? Этот вопрос связан с вопросом ниже.

Наконец, если я сделаю преобразование в RDD, например. rdd.map(_.split("-")), а затем мне нужен новый size RDD, мне нужно выполнить действие на RDD, например count(), поэтому вся информация отправляется обратно во драйвер node?

4b9b3361

Ответ 1

Обычно файл (или части файла, если он слишком большой) реплицируется на N узлов в кластере (по умолчанию N = 3 на HDFS). Это не намерение разбивать каждый файл между всеми доступными узлами.

Однако, для вас (то есть клиент), работающего с файлом с использованием Spark, должен быть прозрачным - вы не должны видеть разницы в rdd.size, независимо от того, сколько узлов он разделяет и/или реплицирует. Существуют методы (по крайней мере, в Hadoop), чтобы узнать, какие узлы (части) файла могут быть расположены в данный момент. Однако в простых случаях вам, скорее всего, не понадобится использовать эту функциональность.

UPDATE: статья, описывающая внутренности RDD: https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf

Ответ 2

val rdd = sc.textFile(file)

Означает ли это, что файл теперь разбит на разделы по узлам?

Файл остается там, где он был. Элементы результирующего RDD[String] являются строками файла. RDD разбивается на разделы, чтобы соответствовать естественному разделению базовой файловой системы. Количество разделов не зависит от количества узлов, которые у вас есть.

Важно понимать, что при выполнении этой строки он не читает файл (ы). RDD является ленивым объектом и будет делать что-то только тогда, когда это необходимо. Это здорово, потому что это позволяет избежать ненужного использования памяти.

Например, если вы пишете val errors = rdd.filter(line => line.startsWith("error")), ничего не происходит. Если вы затем напишете val errorCount = errors.count, теперь ваша последовательность операций должна быть выполнена, потому что результат count является целым числом. То, что каждый рабочий ядро ​​(поток исполнителей) будет выполнять параллельно, читает файл (или часть файла), выполняет итерацию по его строкам и подсчитывает строки, начинающиеся с "ошибки". Буферизация и GC в сторону, только одна линия на ядро ​​будет в памяти одновременно. Это позволяет работать с очень большими данными, не используя много памяти.

Я хочу подсчитать количество объектов в RDD, однако мне нужно использовать это число в вычислении, которое должно быть применено к объектам в RDD - пример псевдокода:

rdd.map(x => x / rdd.size)

Нет метода rdd.size. Существует rdd.count, который подсчитывает количество элементов в RDD. rdd.map(x => x / rdd.count) не будет работать. Код попытается отправить переменную rdd всем работникам и завершится с ошибкой NotSerializableException. Что вы можете сделать, это:

val count = rdd.count
val normalized = rdd.map(x => x / count)

Это работает, потому что count является Int и может быть сериализовано.

Если я делаю преобразование в RDD, например. rdd.map(_.split("-")), а затем мне нужен новый размер RDD, мне нужно выполнить действие на RDD, например count(), поэтому вся информация отправляется обратно в драйвер node?

map не изменяет количество элементов. Я не знаю, что вы подразумеваете под "размером". Но да, вам нужно выполнить действие, например count, чтобы получить что-либо из RDD. Вы видите, что никакая работа не выполняется до тех пор, пока вы не выполните действие. (Когда вы выполняете count, только счет на каждый раздел будет отправлен обратно в драйвер, конечно, не "вся информация".)