Подтвердить что ты не робот

Как избежать дублирования столбцов после объединения?

У меня есть два фрейма данных со следующими столбцами:

df1.columns
//  Array(ts, id, X1, X2)

и

df2.columns
//  Array(ts, id, Y1, Y2)

После

val df_combined = df1.join(df2, Seq(ts,id))

Я получаю следующие столбцы: Array(ts, id, X1, X2, ts, id, Y1, Y2). Я мог ожидать, что общие столбцы будут удалены. Есть ли что-то, что необходимо сделать?

4b9b3361

Ответ 1

Простой ответ (из Часто задаваемые вопросы по данным по этому вопросу) заключается в том, чтобы выполнить объединение, в котором объединенные столбцы выражаются как массив строк (или одна строка) вместо предиката.

Ниже приведен пример, связанный с часто задаваемыми параметрами Databricks, но с двумя столбцами соединения, чтобы ответить на исходный вопрос о посте.

Вот кадр слева:

val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))

val left = llist.toDF("firstname","lastname","date","duration")

left.show()

/*
+---------+--------+----------+--------+
|firstname|lastname|      date|duration|
+---------+--------+----------+--------+
|      bob|       b|2015-01-13|       4|
|    alice|       a|2015-04-23|      10|
+---------+--------+----------+--------+
*/

Вот кадр справа:

val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")

right.show()

/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
|    alice|       a|   100|
|      bob|       b|    23|
+---------+--------+------+
*/

Ниже приведено неверное решение, где столбцы соединения определяются как предикат left("firstname")===right("firstname") && left("lastname")===right("lastname").

Неправильный результат состоит в том, что столбцы firstname и lastname дублируются в объединенном фрейме данных:

left.join(right, left("firstname")===right("firstname") &&
                 left("lastname")===right("lastname")).show

/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname|      date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
|      bob|       b|2015-01-13|       4|      bob|       b|    23|
|    alice|       a|2015-04-23|      10|    alice|       a|   100|
+---------+--------+----------+--------+---------+--------+------+
*/

Правильное решение состоит в том, чтобы определить столбцы соединения как массив строк Seq("firstname", "lastname"). Кадр выходных данных не имеет дублированных столбцов:

left.join(right, Seq("firstname", "lastname")).show

/*
+---------+--------+----------+--------+------+
|firstname|lastname|      date|duration|upload|
+---------+--------+----------+--------+------+
|      bob|       b|2015-01-13|       4|    23|
|    alice|       a|2015-04-23|      10|   100|
+---------+--------+----------+--------+------+
*/

Ответ 2

Это ожидаемое поведение. Метод DataFrame.join эквивалентен SQL-соединению, как это

SELECT * FROM a JOIN b ON joinExprs

Если вы хотите игнорировать повторяющиеся столбцы, просто отбросьте их или выберите интересующие столбцы впоследствии. Если вы хотите устранить неоднозначность, вы можете использовать доступ к ним, используя родительские DataFrames:

val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???

a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent 
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))

или используйте псевдонимы:

// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")

Для равных объединений существует специальный синтаксис ярлыков, который принимает либо последовательность строк:

val usingColumns: Seq[String] = ???

a.join(b, usingColumns)

или как одна строка

val usingColumn: String = ???

a.join(b, usingColumn)

которые содержат только одну копию столбцов, используемых в условии соединения.

Ответ 3

Я застрял в этом некоторое время, и только недавно я придумал решение, что довольно легко.

Скажите, что a

scala> val a  = Seq(("a", 1), ("b", 2)).toDF("key", "vala")
a: org.apache.spark.sql.DataFrame = [key: string, vala: int]

scala> a.show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+
and 
scala> val b  = Seq(("a", 1)).toDF("key", "valb")
b: org.apache.spark.sql.DataFrame = [key: string, valb: int]

scala> b.show
+---+----+
|key|valb|
+---+----+
|  a|   1|
+---+----+

и я могу сделать это, чтобы выбрать только значение в dataframe a:

scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+

Ответ 4

Вы можете просто использовать это

df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN")

Здесь TYPE-OF-JOIN может быть

  • оставил
  • право
  • внутренний
  • fullouter

Например, у меня есть два кадра данных, как это:

// df1
word   count1
w1     10   
w2     15  
w3     20

// df2
word   count2
w1     100   
w2     150  
w5     200

Если вы присоединитесь к fullouter, то результат выглядит так

df1.join(df2, Seq("word"),"fullouter").show()

word   count1  count2
w1     10      100
w2     15      150
w3     20      null
w5     null    200

Ответ 5

Это нормальное поведение SQL, что я делаю для этого:

  • Отбрасывать или переименовывать исходные столбцы
  • Сделайте соединение
  • Переименовать столбцы, если есть

Здесь я заменяю столбец "fullname":

Некоторый код в Java:

this
    .sqlContext
    .read()
    .parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day))
    .drop("fullname")
    .registerTempTable("data_original");

this
    .sqlContext
    .read()
    .parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day))
    .registerTempTable("data_v2");

 this
    .sqlContext
    .sql(etlQuery)
    .repartition(1)
    .write()
    .mode(SaveMode.Overwrite)
    .parquet(outputPath);

Если запрос:

SELECT
    d.*,
   concat_ws('_', product_name, product_module, name) AS fullname
FROM
    {table_source} d
LEFT OUTER JOIN
    {table_updates} u ON u.id = d.id

Это то, что вы можете сделать только с Spark, я верю (drop column из списка), очень полезно!

Ответ 7

попробуй это,

val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))

Ответ 8

Лучше всего перед тем, как присоединять их к именам столбцов в DF, и отбрасывать их соответствующим образом.

df1.columns = [id, возраст, доход] df2.column = [id, age_group]

df1.join(df2, on = df1.id == df2.id, how = 'inner'). write.saveAsTable('table_name')

//вернет ошибку, а ошибка для повторяющихся столбцов

//вместо этого попробуйте это

df1.join(df2.withColumnRenamed('id', 'id_2'), on = df1.id == df2.id_2, how = 'inner'). drop ('id_2')

Ответ 9

После объединения нескольких таблиц я запускаю их с помощью простой функции, чтобы переименовать столбцы в DF, если он встречается с дубликатами. Кроме того, вы также можете удалить эти дубликаты столбцов.

Где Names - это таблица со столбцами ['Id', 'Name', 'DateId', 'Description'], а Dates - это таблица со столбцами ['Id', 'Date', 'Description'], после объединения столбцы Id и Description будут дублированы.

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = deDupeDfCols(NamesAndDates, '_')
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

Где deDupeDfCols определяется как:

def deDupeDfCols(df, separator=''):
    newcols = []

    for col in df.columns:
        if col not in newcols:
            newcols.append(col)
        else:
            for i in range(2, 1000):
                if (col + separator + str(i)) not in newcols:
                    newcols.append(col + separator + str(i))
                    break

    return df.toDF(*newcols)

Полученный фрейм данных будет содержать столбцы ['Id', 'Name', 'DateId', 'Description', 'Id2', 'Date', 'Description2'].

Извиняюсь за этот ответ на Python - я не знаком со Scala, но этот вопрос возник, когда я погуглил эту проблему, и я уверен, что код Scala не слишком отличается.