Я новичок в Apache Spark и изучал основные функции. Было небольшое сомнение. Предположим, у меня есть RDD кортежей (ключ, значение) и вы хотите получить из них некоторые уникальные. Я использую функцию distinct(). Мне интересно, на каком основании функция считает, что кортежи как разрозненные.? Он основан на ключах или значениях или обоих?
Как работает функция Distinct() в Spark?
Ответ 1
.distinct(), безусловно, делает перетасовку между разделами. Чтобы узнать больше о том, что происходит, запустите .toDebugString на вашем RDD.
val hashPart = new HashPartitioner(<number of partitions>)
val myRDDPreStep = <load some RDD>
val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)
который для примера RDD у меня (myRDDPreStep уже hash-partitioned ключом, сохраняемый StorageLevel.MEMORY_AND_DISK_SER и контрольным), возвращает:
(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
| CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
| myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]
Обратите внимание, что могут быть более эффективные способы получения отдельных элементов, которые включают в себя меньшее количество тасований, ОСОБЕННО, если ваш RDD уже разбит на разделы интеллектуальным способом, а разделы не слишком перекошены.
См. Есть ли способ переписать Spark RDD, отличный от использования mapPartitions вместо отдельных? а также Apache Spark: какова эквивалентная реализация RDD.groupByKey() с использованием RDD.aggregateByKey()?
Ответ 2
Документы API для RDD.distinct() содержат только одно предложение:
"Возвращает новый RDD, содержащий отдельные элементы в этом RDD.
Из недавнего опыта я могу сказать вам, что в кортеже-RDD рассматривается кортеж в целом.
Если вам нужны разные ключи или разные значения, то в зависимости от того, что вы хотите выполнить, вы можете:
а. вызовите groupByKey()
, чтобы преобразовать {(k1,v11),(k1,v12),(k2,v21),(k2,v22)}
в {(k1,[v11,v12]), (k2,[v21,v22])}
; или
В. вычеркните либо клавиши, либо значения, вызвав keys()
или values()
, а затем distinct()
На момент написания этой статьи (июнь 2015 г.) UC Berkeley + EdX запускает бесплатный онлайн-курс Введение в большие данные и Apache Spark, которые будут предоставлять руки на практике с этими функциями.
Ответ 3
Джастин Пихони прав. Distinct использует hashCode и метод equals объектов для этого определения. Он возвращает различные элементы (объект)
val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
отчетливый
rdd.distinct.collect().foreach(println)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
Если вы хотите применить отчет о ключе. В этом случае уменьшить на лучший вариант
ReduceBy
val reduceRDD= rdd.map(tup =>
(tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2)
reduceRDD.collect().foreach(println)
Выход:-
(2,20)
(1,20)
(3,21)
Ответ 4
distinct
использует методы hashCode
и equals
объектов для этого определения. Наборы построены с механизмами равенства, делегирующими вниз в равенство и положение каждого объекта. Таким образом, distinct
будет работать против всего объекта Tuple2
. Как указал Павел, вы можете вызвать keys
или values
, а затем distinct
. Или вы можете написать свои собственные значения через aggregateByKey
, которые сохранят сопряжение ключей. Или, если вам нужны разные клавиши, вы можете использовать обычный aggregate
Ответ 5
Похоже, что distinct
избавится от дубликатов (ключ, значение).
В приведенном ниже примере (1,20) и (2,20) повторяются дважды в myRDD
, но после a distinct()
дубликаты удаляются.
scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22
scala> myRDD.collect().foreach(println _)
(1,20)
(1,21)
(1,20)
(2,20)
(2,22)
(2,20)
(3,21)
(3,22)
scala> myRDD.distinct.collect().foreach(println _)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)