Подтвердить что ты не робот

Конкатенация столбцов в Apache Spark DataFrame

Как мы объединяем два столбца в Apache Spark DataFrame? Есть ли какая-либо функция в Spark SQL, которую мы можем использовать?

4b9b3361

Ответ 1

С помощью raw SQL вы можете использовать CONCAT:

  • В Python

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    
  • В Scala

    import sqlContext.implicits._
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    

С Spark 1.5.0 вы можете использовать функцию CONCAT с API DataFrame:

  • В Python:

    from pyspark.sql.functions import concat, col, lit
    
    df.select(concat(col("k"), lit(" "), col("v")))
    
  • В Scala:

    import org.apache.spark.sql.functions.{concat, lit}
    
    df.select(concat($"k", lit(" "), $"v"))
    

Существует также функция concat_ws, которая принимает разделитель строк в качестве первого аргумента.

Ответ 2

Здесь вы можете сделать индивидуальное именование

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

дает,

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

создать новый столбец путем конкатенации:

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+

Ответ 3

Один из вариантов конкатенации строковых столбцов в Spark Scala - использование concat.

Нужно проверить нулевые значения. Поскольку, если один из столбцов имеет значение null, результат будет нулевым, даже если один из других столбцов имеет информацию.

Использование concat и withColumn:

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

Используя concat и select:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

В обоих подходах вы будете иметь NEW_COLUMN, значение которого представляет собой конкатенацию столбцов: COL1 и COL2 из вашего исходного df.

Ответ 4

Если вы хотите сделать это с помощью DF, вы можете использовать udf для добавления нового столбца на основе существующих столбцов.

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)

//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))

//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )

//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()

Ответ 5

Вот еще один способ сделать это для pyspark:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit

#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])

#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))

#Show the new data frame
personDF.show()

----------RESULT-------------------------

84
+------------+
|East African|
+------------+
|   Ethiopian|
|      Kenyan|
|     Ugandan|
|     Rwandan|
+------------+

Ответ 6

В Spark 2.3 (SPARK-22771) Spark SQL поддерживает оператор конкатенации ||.

Например,

val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")

Ответ 7

Вот предложение, когда вы не знаете номер или имя столбцов в Dataframe.

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))

Ответ 8

В Spark 2.3.0 вы можете:

spark.sql( """ select '1' || column_a from table_a """)

Ответ 9

Другой способ сделать это в pySpark с помощью sqlContext...

#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))

Ответ 10

В Java вы можете сделать это, чтобы объединить несколько столбцов. Образец кода - предоставить вам сценарий и как его использовать для лучшего понимания.

SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
                        .withColumn("concatenatedCol",
                                concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));


class JavaSparkSessionSingleton {
    private static transient SparkSession instance = null;

    public static SparkSession getInstance(SparkConf sparkConf) {
        if (instance == null) {
            instance = SparkSession.builder().config(sparkConf)
                    .getOrCreate();
        }
        return instance;
    }
}

Вышеуказанный код объединяет col1, col2, col3, разделенные "_", чтобы создать столбец с именем "concatenatedCol".

Ответ 11

В pyspark вы можете объединить два строковых столбца (используя lambdas) следующим образом:

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('concatCols').getOrCreate()

data = [('row11', 'row12'), ('row21','row22')]  
df = spark.createDataFrame(data, ['col1','col2'])

df.show()
+-----+-----+
| col1| col2|
+-----+-----+
|row11|row12|
|row21|row22|
+-----+-----+

from pyspark.sql.functions import col,udf
from pyspark.sql.types import StringType

funcConcatCols = udf(lambda x,y: x+'_'+y,StringType())

df = df.withColumn('col1 and col2',funcConcatCols(col('col1'),col('col2')))

df.show()
+-----+-----+-------------+
| col1| col2|col1 and col2|
+-----+-----+-------------+
|row11|row12|  row11_row12|
|row21|row22|  row21_row22|
+-----+-----+-------------+

Ответ 12

В самом деле, есть несколько прекрасных встроенных абстракций, которые помогут вам выполнить конкатенацию без необходимости реализации пользовательской функции. Так как вы упомянули Spark SQL, я предполагаю, что вы пытаетесь передать его как декларативную команду через spark.sql(). Если это так, вы можете выполнить прямую передачу SQL-команды, например: SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;

Также в Spark 2.3.0 вы можете использовать команды в строках с: SELECT col1 || col2 AS concat_column_name FROM <table_name>;

Где ваш предпочитаемый разделитель (также может быть пустым пространством) и временная или постоянная таблица, из которой вы пытаетесь прочитать.