Подтвердить что ты не робот

Как преобразовать столбец массива (т.е. списка) в вектор

Короткий вариант вопроса!

Рассмотрим следующий фрагмент кода (если spark уже установлен в некоторый SparkSession):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

Обратите внимание, что поле температур представляет собой список поплавков. Я хотел бы преобразовать эти списки float в тип MLlib Vector, и я хотел бы, чтобы это преобразование выражалось с использованием базового API DataFrame, а не через RDD (что неэффективно, поскольку оно отправляет все данные из JVM для Python, обработка выполняется на Python, мы не получаем преимуществ оптимизатора искры Catalyst, yada yada). Как мне это сделать? В частности:

  • Есть ли способ получить прямую работу? Подробнее см. Ниже (и неудачная попытка обходного пути)? Или, есть ли другая операция, которая повлияла на меня?
  • Что более эффективно из двух альтернативных решений, которые я предлагаю ниже (UDF против взрыва/повторной сборки элементов в списке)? Или есть какие-то другие почти, но не совсем правильные альтернативы, которые лучше, чем любой из них?

Не работает прямая трансляция

Это то, что я ожидал бы как "правильное" решение. Я хочу преобразовать тип столбца из одного типа в другой, поэтому я должен использовать листинг. В качестве некоторого контекста позвольте мне напомнить вам об обычном способе передать его другому типу:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

Теперь, например, df_with_strings.collect()[0]["temperatures"][1] '-7.0'. Но если я бросаю в ml Vector, все происходит не так хорошо:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

Это дает ошибку:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to [email protected];;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

Хлоп! Любые идеи, как это исправить?

Возможные альтернативы

Альтернатива 1: Использование VectorAssembler

Существует Transformer, который кажется почти идеальным для этой работы: VectorAssembler. Он принимает один или несколько столбцов и объединяет их в один вектор. К сожалению, для этого нужны столбцы Vector и Float, а не Array, поэтому следующее не работает:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

Он дает эту ошибку:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

Лучшая работа, о которой я могу думать, состоит в том, чтобы взорвать список в несколько столбцов, а затем использовать VectorAssembler, чтобы снова собрать их обратно:

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

Кажется, что это будет идеально, за исключением того, что TEMPERATURE_COUNT будет больше 100, а иногда и больше 1000. (Еще одна проблема заключается в том, что код будет более сложным, если вы не знаете размер массива в заранее, хотя это не относится к моим данным.) Действительно ли Spark создает промежуточный набор данных с таким количеством столбцов или просто рассматривает этот промежуточный шаг, который отдельные элементы проходят через переходный период (или действительно оптимизирует этот шаг полностью, когда видит, что единственное использование этих столбцов должно быть собрано в вектор)?

Альтернатива 2: используйте UDF

Более простая альтернатива - использовать UDF для преобразования. Это позволяет мне прямо сказать, что я хочу делать в одной строке кода, и не требует создания набора данных с сумасшедшим числом столбцов. Но все эти данные должны быть обменены между Python и JVM, и каждый отдельный номер должен обрабатываться Python (который, как известно, медленный для итерации по отдельным элементам данных). Вот как это выглядит:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

Незначительные замечания

Остальные разделы этого бессвязного вопроса - некоторые дополнительные вещи, которые я придумал, пытаясь найти ответ. Большинство людей читают это.

Не решение: используйте Vector, чтобы начать с

В этом тривиальном примере для начала можно создать данные с использованием векторного типа, но, конечно, мои данные не являются списком Python, который я распараллеливаю, но вместо этого считывается из источника данных. Но для записи, вот как это выглядело бы:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

Неэффективное решение: используйте map()

Одна из возможностей заключается в использовании метода RDD map() для преобразования списка в Vector. Это похоже на идею UDF, за исключением того, что ее еще хуже, потому что стоимость сериализации и т.д. Приходится на все поля в каждой строке, а не только на одну из них. Для записи здесь должно выглядеть следующее решение:

df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()

Неудачная попытка обходного пути для cast

В отчаянии я заметил, что Vector представляется внутри структурой с четырьмя полями, но использование традиционного приведения из этого типа структуры тоже не работает. Вот иллюстрация (где я построил структуру с помощью udf, но udf не является важной частью):

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)

Это дает ошибку:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to [email protected];;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
4b9b3361

Ответ 1

Лично я бы пошел с Python UDF и не стал бы беспокоиться ни о чем другом:

Но если вы действительно хотите, чтобы другие варианты здесь, вы:

  • Scala UDF с оболочкой Python:

    Установите sbt, следуя инструкциям на сайте проекта.

    Создайте пакет Scala со следующей структурой:

    .
    ├── build.sbt
    └── udfs.scala
    

    Изменить build.sbt (настроить, чтобы отразить версию Scala и Spark):

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.1.0",
      "org.apache.spark" %% "spark-mllib" % "2.1.0"
    )
    

    Изменить udfs.scala:

    package com.example.spark.udfs
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector
    
    object udfs {
      val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }
    

    Пакет:

    sbt package
    

    и включить (или эквивалент в зависимости от версии Scala):

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
    

    в качестве аргумента для --driver-class-path при запуске оболочки/отправке приложения.

    В PySpark определите оболочку:

    from pyspark.sql.column import _to_java_column, _to_seq, Column
    from pyspark import SparkContext
    
    def as_vector(col):
        sc = SparkContext.getOrCreate()
        f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
        return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
    

    Тест:

    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_vec.printSchema()
    
     
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- vector: vector (nullable = true)
    
  • Получите данные в формате JSON, отражающем схему DenseVector, и прочитайте их обратно:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    json_vec = to_json(struct(struct(
        lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
        col("temperatures").alias("values")
    ).alias("v")))
    
    schema = StructType([StructField("v", VectorUDT())])
    
    with_parsed_vector = df.withColumn(
        "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    with_parsed_vector.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_parsed_vector.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)
    

Ответ 2

У меня была такая же проблема, как у вас, и я сделал это так. Этот способ включает преобразование СДР, поэтому не критичен к производительности, но он работает.

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])

new_df

результат -

DataFrame[city: string, temperatures: vector]