Подтвердить что ты не робот

Spark: какая лучшая стратегия для объединения RDD с 2 ключами с однократным RDD?

У меня есть два RDD, к которым я хочу присоединиться, и они выглядят так:

val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]

Случается, что ключевые значения rdd1 уникальны, а также то, что значения tuple-key rdd2 уникальны. Я хотел бы присоединиться к двум наборам данных, чтобы получить следующий rdd:

val rdd_joined:RDD[((T,W), (U,V))]

Какой самый эффективный способ достичь этого? Вот несколько идей, о которых я думал.

Вариант 1:

val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})

Вариант 2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)

Вариант 1 будет собирать все данные для мастеринга, правильно? Таким образом, это не похоже на хороший вариант, если rdd1 большой (он относительно большой в моем случае, хотя на порядок меньше rdd2). Вариант 2 делает уродливый отчетливый и декартова продукт, который также кажется очень неэффективным. Еще одна возможность, которая пришла мне в голову (но еще не пробовала), - это сделать вариант 1 и трансляцию карты, хотя лучше было бы транслировать "умным" способом, чтобы ключи карты были расположены вместе с ключи rdd2.

Кто-нибудь раньше сталкивался с подобной ситуацией? Я был бы рад получить ваши мысли.

Спасибо!

4b9b3361

Ответ 1

Один из вариантов заключается в том, чтобы выполнить широковещательное соединение, собирая rdd1 в драйвер и передавая его всем картографам; сделано правильно, это позволит нам избежать дорогого перетасовки большого rdd2 RDD:

val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))

val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
val joined = rdd2.mapPartitions({ iter =>
  val m = rdd1Broadcast.value
  for {
    ((t, w), u) <- iter
    if m.contains(t)
  } yield ((t, w), (u, m.get(t).get))
}, preservesPartitioning = true)

preservesPartitioning = true сообщает Спарку, что эта функция карты не изменяет ключи rdd2; это позволит Spark избежать повторного разбиения на разделы rdd2 для любых последующих операций, которые соединяются на основе ключа (t, w).

Эта передача может быть неэффективной, поскольку она связана с узким местом связи в драйвере. В принципе, можно передавать один RDD другому без привлечения водителя; У меня есть прототип этого, который я бы хотел обобщить и добавить в Spark.

Другим вариантом является повторная карта ключей rdd2 и использование метода Spark join; это будет включать полную перетасовку rdd2 (и, возможно, rdd1):

rdd1.join(rdd2.map {
  case ((t, w), u) => (t, (w, u))
}).map {
  case (t, (v, (w, u))) => ((t, w), (u, v))
}.collect()

В моем примере ввода оба этих метода дают один и тот же результат:

res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))

Третий вариант - реструктурировать rdd2, чтобы t был его ключом, а затем выполнил указанное выше соединение.

Ответ 2

Другой способ сделать это - создать пользовательский разделитель, а затем использовать zipPartitions для присоединения к вашим RDD.

import org.apache.spark.HashPartitioner

class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {

  override def getPartition(key: Any): Int = key match {
    case k: Tuple2[Int, String] => super.getPartition(k._1)
    case _ => super.getPartition(key)
  }

}

val numSplits = 8
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))

val result = rdd2.zipPartitions(rdd1)(
  (iter2, iter1) => {
    val m = iter1.toMap
    for {
        ((t: Int, w), u) <- iter2
        if m.contains(t)
      } yield ((t, w), (u, m.get(t).get))
  }
).partitionBy(new HashPartitioner(numSplits))

result.glom.collect