Подтвердить что ты не робот

Что происходит с `unionAll` Spark` DataFrame`?

Используя Spark 1.5.0 и учитывая следующий код, я ожидаю unionAll для объединения DataFrame на основе их имени столбца. В коде я использую некоторый FunSuite для передачи в SparkContext sc:

object Entities {

  case class A (a: Int, b: Int)
  case class B (b: Int, a: Int)

  val as = Seq(
    A(1,3),
    A(2,4)
  )

  val bs = Seq(
    B(5,3),
    B(6,4)
  )
}

class UnsortedTestSuite extends SparkFunSuite {

  configuredUnitTest("The truth test.") { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val aDF = sc.parallelize(Entities.as, 4).toDF
    val bDF = sc.parallelize(Entities.bs, 4).toDF
    aDF.show()
    bDF.show()
    aDF.unionAll(bDF).show
  }
}

Вывод:

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
+---+---+

+---+---+
|  b|  a|
+---+---+
|  5|  3|
|  6|  4|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
|  5|  3|
|  6|  4|
+---+---+

Почему результат содержит смешанные столбцы "b" и "a" , а не выравнивание столбцов по именам столбцов? Звучит как серьезная ошибка!?

4b9b3361

Ответ 1

Это не похоже на ошибку. Вы видите стандартное поведение SQL и все основные RDMBS, включая PostgreSQL, MySQL, Oracle и MS SQL ведет себя точно тоже самое. Вы найдете примеры SQL Fiddle, связанные с именами.

Чтобы процитировать Руководство по PostgreSQL:

Чтобы вычислить объединение, пересечение или разность двух запросов, два запроса должны быть "совместимыми с профсоюзом", что означает, что они возвращают одинаковое количество столбцов, а соответствующие столбцы имеют совместимые типы данных

Имена столбцов, исключая первую таблицу в заданной операции, просто игнорируются.

Это поведение возникает непосредственно из реляционной алгебры, где основным строительным блоком является кортеж. Поскольку кортежи упорядочены, объединение двух наборов кортежей эквивалентно (игнорируя обработку дубликатов) на вывод, который вы получаете здесь.

Если вы хотите совместить имена, вы можете сделать что-то вроде этого

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

def unionByName(a: DataFrame, b: DataFrame): DataFrame = {
  val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq
  a.select(columns: _*).unionAll(b.select(columns: _*))
}

Чтобы проверить оба имени и типа, должно быть достаточно заменить columns на:

a.dtypes.toSet.intersect(b.dtypes.toSet).map{case (c, _) => col(c)}.toSeq

Ответ 2

Эта проблема исправляется в spark2.3. Они добавляют поддержку unionByName в наборе данных.

https://issues.apache.org/jira/browse/SPARK-21043

Ответ 3

Как обсуждалось в SPARK-9813, кажется, что до тех пор, пока типы данных и количество столбцов одинаковы во всех кадрах, unionAll работа должна работать. Пожалуйста, просмотрите комментарии для дополнительного обсуждения.