Подтвердить что ты не робот

Как выбрать диапазон элементов в Spark RDD?

Я хотел бы выбрать ряд элементов в Spark RDD. Например, у меня есть RDD со сто элементов, и мне нужно выбрать элементы от 60 до 80. Как это сделать?

Я вижу, что RDD имеет метод take (i: int), который возвращает первые элементы i. Но нет соответствующего метода для взятия последних элементов я или элементов из среднего, начинающегося с определенного индекса.

4b9b3361

Ответ 1

Я не думаю, что есть эффективный метод для этого. Но простой способ заключается в использовании filter(), скажем, у вас есть RDD, pairs с парами значений ключа, и вам нужны только элементы от 60 до 80 включительно.

val 60to80 = pairs.filter {
    _ match {
        case (k,v) => k >= 60 && k <= 80
        case _ => false //incase of invalid input
    }
}

Я думаю, что это возможно сделать более эффективно в будущем, используя sortByKey и сохраняя информацию о диапазоне значений, сопоставленных каждому разделу. Имейте в виду, что этот подход сохранил бы только что-нибудь, если вы планировали многократно запрашивать диапазон, потому что сортировка явно дорога.

От взгляда на источник искры, безусловно, можно будет выполнять эффективные запросы диапазона с помощью RangePartitioner:

// An array of upper bounds for the first (partitions - 1) partitions
  private val rangeBounds: Array[K] = {

Это частный член RangePartitioner со знанием всех верхних границ разделов, было бы легко запросить только необходимые разделы. Похоже, это то, что пользователи могут увидеть в будущем: SPARK-911

UPDATE: лучший ответ, основанный на запросе на pull, который я пишу для SPARK-911. Он будет работать эффективно, если RDD будет отсортирован, и вы запросите его несколько раз.

val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
val (lower, upper) = (10, 20)
val range = p.getPartition(lower) to p.getPartition(upper)
println(range)
val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {
  if (range.contains(i))
    for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
  else
    Iterator.empty
}
for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")

Если весь раздел в памяти допустим, вы можете даже сделать что-то вроде этого. val glommedAndCached = sorted.glom()cache(); glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()

search не является членом BTW Я просто сделал неявный класс, который имеет двоичную функцию поиска, не показанную здесь

Ответ 2

Насколько велик ваш набор данных? Вы можете сделать то, что вам нужно:

data.take(80).drop(59)

Это кажется неэффективным, но для данных малого и среднего размера должно работать.

Можно ли решить это по-другому? Какой случай для сбора определенного диапазона из середины ваших данных? Будет ли takeSample служить вам лучше?

Ответ 3

Следующее должно иметь возможность получить диапазон. Обратите внимание, что кеш сохранит вам некоторые издержки, потому что внутренне zipWithIndex нужно сканировать раздел RDD, чтобы получить количество элементов в каждом разделе.

scala>val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3).cache
scala>val r2 = r1.zipWithIndex
scala>val r3 = r2.filter(x=> {x._2>2 && x._2 < 4}).map(x=>x._1)
scala>r3.foreach(println)
d

Ответ 4

Для тех, кто наткнулся на этот вопрос, ища ответ Spark 2.x, вы можете использовать filterByRange