Подтвердить что ты не робот

Как разделить RDD на два или более RDD?

Я ищу способ разделить RDD на два или более RDD. Самое близкое, что я видел, это Scala Искра: Разделить коллекцию на несколько RDD?, которая по-прежнему является единственным RDD.

Если вы знакомы с SAS, что-то вроде этого:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

что привело к двум различным наборам данных. Он должен быть немедленно сохранен, чтобы получить результаты, которые я намерен...

4b9b3361

Ответ 1

Невозможно получить несколько RDD из одного преобразования *. Если вы хотите разделить RDD, вы должны применить filter для каждого условия разделения. Например:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

Если у вас есть только двоичное условие, и вычисления дороги, вы можете предпочесть что-то вроде этого:

kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

Это означает только одно вычисление предикатов, но требует дополнительного прохождения по всем данным.

Важно отметить, что до тех пор, пока входной RDD должным образом кэшируется, и нет никаких дополнительных предположений относительно распределения данных, нет существенной разницы, когда дело доходит до временной сложности между повторным фильтром и циклом с вложенным if-else.

С N элементами и условиями M число операций, которые вы должны выполнить, явно пропорционально N раз M. В случае петли для петли он должен быть ближе к (N + MN)/2, а повторный фильтр - это точно NM, но при В конце дня это не что иное, как O (NM). Вы можете увидеть мое обсуждение ** с Джейсоном Лендерманом, чтобы прочитать о некоторых плюсах и минусах.

На очень высоком уровне вы должны рассмотреть две вещи:

  • Преобразования искры ленивы, пока вы не выполните действие, которое ваше RDD не реализовано

    Почему это имеет значение? Вернемся к моему примеру:

    rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
    

    Если позже я решит, что мне нужно только rdd_odd, тогда нет причин для материализации rdd_even.

    Если вы посмотрите на пример SAS для вычисления work.split2, вам необходимо материализовать как входные данные, так и work.split1.

  • RDD предоставляют декларативный API. Когда вы используете filter или map, это полностью зависит от двигателя Spark, как эта операция выполняется. Пока функции, переданные преобразованиям, свободны от побочных эффектов, он создает множество возможностей для оптимизации всего конвейера.

В конце дня этот случай не является особым, чтобы оправдать собственное преобразование.

Эта карта с шаблоном фильтра фактически используется в ядре Spark. См. Мой ответ на Как Sparks RDD.randomSplit фактически разделяет RDD и релевантная часть метода randomSplit.

Если единственная цель состоит в достижении разделения на входе, то можно использовать предложение partitionBy для DataFrameWriter, формат текста:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)

* В Spark есть только 3 основных типа преобразований:

  • RDD [T] = > RDD [T]
  • RDD [T] = > RDD [U]
  • (RDD [T], RDD [U]) = > RDD [W]

где T, U, W могут быть либо атомными типами, либо products/tuples (K, V). Любая другая операция должна быть выражена с использованием некоторой комбинации вышеизложенного. Вы можете проверить исходную документацию RDD для получения более подробной информации.

** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

*** См. также Scala Искра: разделите коллекцию на несколько RDD?

Ответ 2

В качестве других плакатов, упомянутых выше, нет единого встроенного RDD-преобразования, которое разделяет RDD, но здесь есть некоторые "мультиплексные" операции, которые могут эффективно эмулировать большое разнообразие "расщепления" на RDD без чтения нескольких раз:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions

Некоторые методы, характерные для случайного расщепления:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions

Способы доступны из проекта silex с открытым исходным кодом:

https://github.com/willb/silex

Сообщение в блоге, объясняющее, как они работают:

http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/

def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}

def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}

Как упоминалось в других источниках, эти методы включают в себя компромисс памяти для скорости, потому что они работают, вычисляя целые результаты разделов "нетерпеливо", а не "лениво". Таким образом, эти методы могут возникать в задачах памяти на больших разделах, где более традиционные ленивые преобразования не будут.

Ответ 3

Если вы разделите RDD с помощью randomSplit API, вы получите массив RDD.

Если вы хотите вернуть 5 RDD, переходите к 5 значениям веса.

например.

val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)

splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)

Ответ 4

Один из способов - использовать пользовательский разделитель для разделения данных в зависимости от вашего условия фильтра. Это может быть достигнуто путем расширения Partitioner и реализации чего-то похожего на RangePartitioner.

Затем можно разбивать разбиения на карты для создания нескольких RDD из секционированного RDD без чтения всех данных.

val filtered = partitioned.mapPartitions { iter => {

  new Iterator[Int](){
    override def hasNext: Boolean = {
      if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
        false
      } else {
        iter.hasNext
      }
    }

    override def next():Int = iter.next()
  }

Просто имейте в виду, что количество разделов в отфильтрованных RDD будет таким же, как число в секционированном RDD, поэтому для уменьшения этого и сокращения пустых разделов следует использовать коалесценцию.