Подтвердить что ты не робот

Как работает функция Distinct() в Spark?

Я новичок в Apache Spark и изучал основные функции. Было небольшое сомнение. Предположим, у меня есть RDD кортежей (ключ, значение) и вы хотите получить из них некоторые уникальные. Я использую функцию distinct(). Мне интересно, на каком основании функция считает, что кортежи как разрозненные.? Он основан на ключах или значениях или обоих?

4b9b3361

Ответ 1

.distinct(), безусловно, делает перетасовку между разделами. Чтобы узнать больше о том, что происходит, запустите .toDebugString на вашем RDD.

val hashPart = new HashPartitioner(<number of partitions>)

val myRDDPreStep = <load some RDD>

val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)

который для примера RDD у меня (myRDDPreStep уже hash-partitioned ключом, сохраняемый StorageLevel.MEMORY_AND_DISK_SER и контрольным), возвращает:

(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
    |    ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
    +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
        |    myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
        |        CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
        |    myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]

Обратите внимание, что могут быть более эффективные способы получения отдельных элементов, которые включают в себя меньшее количество тасований, ОСОБЕННО, если ваш RDD уже разбит на разделы интеллектуальным способом, а разделы не слишком перекошены.

См. Есть ли способ переписать Spark RDD, отличный от использования mapPartitions вместо отдельных? а также Apache Spark: какова эквивалентная реализация RDD.groupByKey() с использованием RDD.aggregateByKey()?

Ответ 2

Документы API для RDD.distinct() содержат только одно предложение:

"Возвращает новый RDD, содержащий отдельные элементы в этом RDD.

Из недавнего опыта я могу сказать вам, что в кортеже-RDD рассматривается кортеж в целом.

Если вам нужны разные ключи или разные значения, то в зависимости от того, что вы хотите выполнить, вы можете:

а. вызовите groupByKey(), чтобы преобразовать {(k1,v11),(k1,v12),(k2,v21),(k2,v22)} в {(k1,[v11,v12]), (k2,[v21,v22])}; или

В. вычеркните либо клавиши, либо значения, вызвав keys() или values(), а затем distinct()

На момент написания этой статьи (июнь 2015 г.) UC Berkeley + EdX запускает бесплатный онлайн-курс Введение в большие данные и Apache Spark, которые будут предоставлять руки на практике с этими функциями.

Ответ 3

Джастин Пихони прав. Distinct использует hashCode и метод equals объектов для этого определения. Он возвращает различные элементы (объект)

val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))

отчетливый

rdd.distinct.collect().foreach(println)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)

Если вы хотите применить отчет о ключе. В этом случае уменьшить на лучший вариант

ReduceBy

 val reduceRDD= rdd.map(tup =>
    (tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2)

reduceRDD.collect().foreach(println)

Выход:-

(2,20)
(1,20)
(3,21)

Ответ 4

distinct использует методы hashCode и equals объектов для этого определения. Наборы построены с механизмами равенства, делегирующими вниз в равенство и положение каждого объекта. Таким образом, distinct будет работать против всего объекта Tuple2. Как указал Павел, вы можете вызвать keys или values, а затем distinct. Или вы можете написать свои собственные значения через aggregateByKey, которые сохранят сопряжение ключей. Или, если вам нужны разные клавиши, вы можете использовать обычный aggregate

Ответ 5

Похоже, что distinct избавится от дубликатов (ключ, значение).

В приведенном ниже примере (1,20) и (2,20) повторяются дважды в myRDD, но после a distinct() дубликаты удаляются.

scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22

scala> myRDD.collect().foreach(println _)
(1,20)
(1,21)
(1,20)
(2,20)
(2,22)
(2,20)
(3,21)
(3,22)

scala> myRDD.distinct.collect().foreach(println _)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)