Подтвердить что ты не робот

Apache Spark: map vs mapPartitions?

В чем разница между map RDD и методом mapPartitions? И работает ли flatMap как map или как mapPartitions? Благодарю.

(редактировать) то есть какая разница (семантически или с точки зрения исполнения) между

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

А также:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
4b9b3361

Ответ 1

Какая разница между RDD-картой и методом mapPartitions?

Метод map преобразует каждый элемент исходного RDD в один элемент RDD результата, применяя функцию. mapPartitions преобразует каждый раздел исходного RDD в несколько элементов результата (возможно, нет).

И работает ли flatMap как карта или подобные mapPartitions?

Ни flatMap работает на одном элементе (как map) и создает несколько элементов результата (в качестве mapPartitions).

Ответ 2

Настоятельный СОВЕТ:

Всякий раз, когда у вас есть тяжелая инициализация, которую следует выполнить один раз для многих элементов RDD а не один раз для каждого элемента RDD, и если эту инициализацию, такую как создание объектов из сторонней библиотеки, нельзя сериализовать (чтобы Spark мог передавать ее через кластера к рабочим узлам), используйте mapPartitions() вместо map(). mapPartitions() инициализация выполняется один раз для каждой рабочей задачи/потока/раздела, а не один раз для каждого элемента данных RDD например: см. ниже.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. flatMap ведет себя как карта или как mapPartitions?

Да. пожалуйста, см. пример 2 flatmap.. это само за себя.

Q1. В чем разница между map RDD и mapPartitions

map работает с функцией, используемой на уровне элемента, в то время как mapPartitions выполняет функцию на уровне раздела.

Пример сценария: если у нас есть 100K элементов в определенном разделе RDD то мы будем запускать функцию, используемую преобразованием отображения 100K раз, когда мы используем map.

И наоборот, если мы используем mapPartitions то мы будем вызывать определенную функцию только один раз, но мы передадим все записи по 100 КБ и получим все ответы за один вызов функции.

Это приведет к увеличению производительности, поскольку map работает с определенной функцией очень много раз, особенно если функция делает что-то дорогостоящее каждый раз, когда в этом нет необходимости, если мы передаем все элементы одновременно (в случае mappartitions).

карта

Применяет функцию преобразования к каждому элементу СДР и возвращает результат как новый СДР.

Листинг Варианты

карта отображения [U: ClassTag] (f: T => U): RDD [U]

Пример:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

Это специализированная карта, которая вызывается только один раз для каждого раздела. Все содержимое соответствующих разделов доступно в виде последовательного потока значений через входной аргумент (Iterarator [T]). Пользовательская функция должна возвращать еще один Iterator [U]. Объединенные итераторы результатов автоматически преобразуются в новый СДР. Обратите внимание, что кортежи (3,4) и (6,7) отсутствуют в следующем результате из-за выбранного нами разбиения.

preservesPartitioning указывает, preservesPartitioning ли функция ввода разделитель, что должно быть false если это не пара RDD, и функция ввода не изменяет ключи.

Листинг Варианты

def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], preservedPartitioning: Boolean = false): RDD [U]

Пример 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Пример 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

Вышеуказанная программа также может быть написана с использованием flatMap следующим образом.

Пример 2 с использованием flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Заключение:

Преобразование mapPartitions быстрее, чем map поскольку оно вызывает вашу функцию один раз/раздел, а не один раз/элемент.

Дальнейшее чтение: foreach против foreachPartitions Когда использовать Что?

Ответ 3

Карта:

  1. Он обрабатывает одну строку за раз, очень похоже на метод map() MapReduce.
  2. Вы возвращаетесь из трансформации после каждой строки.

MapPartitions

  1. Обрабатывает полный раздел за один раз.
  2. Вы можете вернуться из функции только один раз после обработки всего раздела.
  3. Все промежуточные результаты должны храниться в памяти, пока вы не обработаете весь раздел.
  4. Предоставляет вам как функцию setup() map() и cleanup() функции MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/