Подтвердить что ты не робот

Spark задает несколько условий столбца для объединения данных

Как предоставить больше условий столбца при объединении двух фреймов данных. Например, я хочу запустить следующее:

val Lead_all = Leads.join(Utm_Master,  
    Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
    Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")

Я хочу присоединиться только тогда, когда совпадают эти столбцы. Но выше синтаксис недействителен, так как cols принимает только одну строку. Итак, как мне получить то, что я хочу.

4b9b3361

Ответ 1

В этом случае есть ассоциация Spark API столбца/выражения:

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
        && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
        && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
        && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
    "left"
)

Оператор <=> в этом примере означает тест на равномерность для нулевых значений.

Основное отличие от простого теста Equality (===) заключается в том, что первый безопасен для использования в случае, если один из столбцов может имеют нулевые значения.

Ответ 2

В версии Spark версии 1.5.0 (которая в настоящее время не издана) вы можете присоединиться к нескольким столбцам DataFrame. Обратитесь к SPARK-7990: добавьте методы, чтобы облегчить равноудаление нескольких ключей присоединения.

Python

Leads.join(
    Utm_Master, 
    ["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
    "left_outer"
)

Scala

Вопрос задан для ответа Scala, но я не использую Scala. Вот мое лучшее предположение....

Leads.join(
    Utm_Master,
    Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
    "left_outer"
)

Ответ 3

Одна вещь, которую вы можете сделать, это использовать raw SQL:

case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)

val bar = sqlContext.createDataFrame(sc.parallelize(
    Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
    Bar(3, 1, 2, "bar") :: Nil))

val foo = sqlContext.createDataFrame(sc.parallelize(
    Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
    Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))

foo.registerTempTable("foo")
bar.registerTempTable("bar")

sqlContext.sql(
    "SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")

Ответ 4

В Pyspark вы можете просто указать каждое условие отдельно:

val Lead_all = Leads.join(Utm_Master,  
    (Leaddetails.LeadSource == Utm_Master.LeadSource) &
    (Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
    (Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
    (Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))

Не забудьте правильно использовать операторы и скобки.

Ответ 5

Scala:

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
        && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
        && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
        && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
    "left"
)

Чтобы сделать регистр нечувствительным,

import org.apache.spark.sql.functions.{lower, upper}

то просто используйте lower(value) в условии метода соединения.

Например: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))

Ответ 6

Опции === дают мне дублированные столбцы. Поэтому я использую Seq вместо этого.

val Lead_all = Leads.join(Utm_Master,
    Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")

Конечно, это работает только тогда, когда имена соединяющихся столбцов совпадают.

Ответ 7

Spark SQL поддерживает соединение в кортеже столбцов, когда в круглых скобках, например

... WHERE (list_of_columns1) = (list_of_columns2)

что является способом, меньшим, чем указание равных выражений (=) для каждой пары столбцов, объединенных набором "AND" s.

Например:

SELECT a,b,c
FROM    tab1 t1
WHERE 
   NOT EXISTS
   (    SELECT 1
        FROM    t1_except_t2_df e
        WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
   )

вместо

SELECT a,b,c
FROM    tab1 t1
WHERE 
   NOT EXISTS
   (    SELECT 1
        FROM    t1_except_t2_df e
        WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
   )

что менее читаемо, особенно если список столбцов большой, и вы хотите легко обращаться с NULL.

Ответ 8

Попробуйте это:

val rccJoin=dfRccDeuda.as("dfdeuda")
.join(dfRccCliente.as("dfcliente")
,col("dfdeuda.etarcid")===col("dfcliente.etarcid") 
&& col("dfdeuda.etarcid")===col("dfcliente.etarcid"),"inner")