Подтвердить что ты не робот

Искра: Лучшая практика для извлечения больших данных из RDD на локальную машину

У меня есть большой RDD (1gb) в кластере пряжи. На локальной машине, использующей этот кластер, я имею только 512 МБ. Я хотел бы перебрать значения в RDD на моей локальной машине. Я не могу использовать collect(), потому что он создавал бы слишком большой массив локально, что больше, чем моя куча. Мне нужен итеративный способ. Существует метод iterator(), но для этого требуется дополнительная информация, которую я не могу предоставить.

UDP: метод commLocalIterator

4b9b3361

Ответ 1

Обновление: RDD.toLocalIterator метод, появившийся после того, как был написан оригинальный ответ, является более эффективным способом выполнения задания. Он использует runJob для оценки только одного раздела на каждом шаге.

TL; DR И исходный ответ может дать приблизительное представление о том, как это работает:

Прежде всего, получите массив индексов раздела:

val parts = rdd.partitions

Затем создайте меньшие rdds, отфильтровывая все, кроме одного раздела. Собирайте данные из меньших rdds и перебирайте значения одного раздела:

for (p <- parts) {
    val idx = p.index
    val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)
    //The second argument is true to avoid rdd reshuffling
    val data = partRdd.collect //data contains all values from a single partition 
                               //in the form of array
    //Now you can do with the data whatever you want: iterate, save to a file, etc.
}

Я не пробовал этот код, но он должен работать. Пожалуйста, напишите комментарий, если он не будет компилироваться. Из-за этого он будет работать только в том случае, если разделы достаточно малы. Если это не так, вы всегда можете увеличить количество разделов с помощью rdd.coalesce(numParts, true).

Ответ 2

Ответ на Wildfire кажется семантически правильным, но я уверен, что вы должны быть намного эффективнее, используя API Spark. Если вы хотите обработать каждый раздел по очереди, я не понимаю, почему вы не можете использовать операции map/filter/reduce/reduceByKey/mapPartitions. Единственный раз, когда вы хотите иметь все в одном месте в одном массиве, - это когда вы собираетесь выполнять немоноидальную операцию, но это не похоже на то, что вы хотите. Вы должны иметь возможность сделать что-то вроде:

rdd.mapPartitions(recordsIterator => your code that processes a single chunk)

Или это

rdd.foreachPartition(partition => {
  partition.toArray
  // Your code
})

Ответ 3

Вот такой же подход, как предложил @Wildlife, но написанный в pyspark.

Самое приятное в этом подходе - это позволяет пользователям получать записи в RDD по порядку. Я использую этот код для подачи данных из RDD в STDIN процесса машинного обучения.

rdd = sc.parallelize(range(100), 10)
def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter

for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    data_from_part_rdd = part_rdd.collect()
    print "partition id: %s elements: %s" % (part_id, data_from_part_rdd)

Производит вывод:

partition id: 0 elements: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
partition id: 1 elements: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
partition id: 2 elements: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
partition id: 3 elements: [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
partition id: 4 elements: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
partition id: 5 elements: [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
partition id: 6 elements: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
partition id: 7 elements: [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
partition id: 8 elements: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
partition id: 9 elements: [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

Ответ 5

Для Spark 1.3.1 формат выглядит следующим образом

val parts = rdd.partitions
    for (p <- parts) {
        val idx = p.index
        val partRdd = data.mapPartitionsWithIndex { 
           case(index:Int,value:Iterator[(String,String,Float)]) => 
             if (index == idx) value else Iterator()}
        val dataPartitioned = partRdd.collect 
        //Apply further processing on data                      
    }

Ответ 6

pyspark dataframe с помощью RDD.toLocalIterator():

separator  = '|'
df_results = hiveCtx.sql(sql)
columns    = df_results.columns
print separator.join(columns)

# Use toLocalIterator() rather than collect(), as this avoids pulling all of the
# data to the driver at one time.  Rather, "the iterator will consume as much memory
# as the largest partition in this RDD."
MAX_BUFFERED_ROW_COUNT = 10000
row_count              = 0
output                 = cStringIO.StringIO()
for record in df_results.rdd.toLocalIterator():
    d = record.asDict()
    output.write(separator.join([str(d[c]) for c in columns]) + '\n')
    row_count += 1
    if row_count % MAX_BUFFERED_ROW_COUNT== 0:
        print output.getvalue().rstrip()
        # it is faster to create a new StringIO rather than clear the existing one
        # http://stackoverflow.com/info/4330812/how-do-i-clear-a-stringio-object
        output = cStringIO.StringIO()
if row_count % MAX_BUFFERED_ROW_COUNT:
    print output.getvalue().rstrip()