Подтвердить что ты не робот

Разбить (транспонировать?) Несколько столбцов в таблице SQL Spark

Я использую Spark SQL (я упоминаю, что он находится в Spark в случае, если это влияет на синтаксис SQL - я еще недостаточно знаком, чтобы быть уверенным), и у меня есть таблица, которую я пытаюсь переструктурировать, но Я застрял, пытаясь транспонировать несколько столбцов одновременно.

В основном у меня есть данные, которые выглядят так:

userId    someString      varA     varB
   1      "example1"    [0,2,5]   [1,2,9]
   2      "example2"    [1,20,5]  [9,null,6]

и я хотел бы одновременно взорвать оба varA и varB (длина всегда будет согласованной), так что конечный результат выглядит следующим образом:

userId    someString      varA     varB
   1      "example1"       0         1
   1      "example1"       2         2
   1      "example1"       5         9
   2      "example2"       1         9
   2      "example2"       20       null
   2      "example2"       5         6

но я могу только представить, что один оператор explode (var) работает в одной команде, и если я попытаюсь связать их (т.е. создать временную таблицу после первой команды explode), то я, очевидно, получаю огромное количество повторяющиеся, ненужные строки.

Большое спасибо!

4b9b3361

Ответ 1

То, что вы хотите, невозможно без пользовательского UDF. В Scala вы можете сделать что-то вроде этого:

val data = sc.parallelize(Seq(
    """{"userId": 1, "someString": "example1",
        "varA": [0, 2, 5], "varB": [1, 2, 9]}""",
    """{"userId": 2, "someString": "example2",
        "varA": [1, 20, 5], "varB": [9, null, 6]}"""
))

val df = sqlContext.read.json(data)

df.printSchema
// root
//  |-- someString: string (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- varA: array (nullable = true)
//  |    |-- element: long (containsNull = true)
//  |-- varB: array (nullable = true)
//  |    |-- element: long (containsNull = true)

Теперь мы можем определить zip udf:

import org.apache.spark.sql.functions.{udf, explode}

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
   $"userId", $"someString",
   $"vars._1".alias("varA"), $"vars._2".alias("varB")).show

// +------+----------+----+----+
// |userId|someString|varA|varB|
// +------+----------+----+----+
// |     1|  example1|   0|   1|
// |     1|  example1|   2|   2|
// |     1|  example1|   5|   9|
// |     2|  example2|   1|   9|
// |     2|  example2|  20|null|
// |     2|  example2|   5|   6|
// +------+----------+----+----+

С сырым SQL:

sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.registerTempTable("df")

sqlContext.sql(
  """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")