Короткий вариант вопроса!
Рассмотрим следующий фрагмент кода (если spark
уже установлен в некоторый SparkSession
):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
Обратите внимание, что поле температур представляет собой список поплавков. Я хотел бы преобразовать эти списки float в тип MLlib Vector
, и я хотел бы, чтобы это преобразование выражалось с использованием базового API DataFrame
, а не через RDD (что неэффективно, поскольку оно отправляет все данные из JVM для Python, обработка выполняется на Python, мы не получаем преимуществ оптимизатора искры Catalyst, yada yada). Как мне это сделать? В частности:
- Есть ли способ получить прямую работу? Подробнее см. Ниже (и неудачная попытка обходного пути)? Или, есть ли другая операция, которая повлияла на меня?
- Что более эффективно из двух альтернативных решений, которые я предлагаю ниже (UDF против взрыва/повторной сборки элементов в списке)? Или есть какие-то другие почти, но не совсем правильные альтернативы, которые лучше, чем любой из них?
Не работает прямая трансляция
Это то, что я ожидал бы как "правильное" решение. Я хочу преобразовать тип столбца из одного типа в другой, поэтому я должен использовать листинг. В качестве некоторого контекста позвольте мне напомнить вам об обычном способе передать его другому типу:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Теперь, например, df_with_strings.collect()[0]["temperatures"][1]
'-7.0'
. Но если я бросаю в ml Vector, все происходит не так хорошо:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Это дает ошибку:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to [email protected];;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
Хлоп! Любые идеи, как это исправить?
Возможные альтернативы
Альтернатива 1: Использование VectorAssembler
Существует Transformer
, который кажется почти идеальным для этой работы: VectorAssembler
. Он принимает один или несколько столбцов и объединяет их в один вектор. К сожалению, для этого нужны столбцы Vector
и Float
, а не Array
, поэтому следующее не работает:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
Он дает эту ошибку:
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
Лучшая работа, о которой я могу думать, состоит в том, чтобы взорвать список в несколько столбцов, а затем использовать VectorAssembler
, чтобы снова собрать их обратно:
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
Кажется, что это будет идеально, за исключением того, что TEMPERATURE_COUNT
будет больше 100, а иногда и больше 1000. (Еще одна проблема заключается в том, что код будет более сложным, если вы не знаете размер массива в заранее, хотя это не относится к моим данным.) Действительно ли Spark создает промежуточный набор данных с таким количеством столбцов или просто рассматривает этот промежуточный шаг, который отдельные элементы проходят через переходный период (или действительно оптимизирует этот шаг полностью, когда видит, что единственное использование этих столбцов должно быть собрано в вектор)?
Альтернатива 2: используйте UDF
Более простая альтернатива - использовать UDF для преобразования. Это позволяет мне прямо сказать, что я хочу делать в одной строке кода, и не требует создания набора данных с сумасшедшим числом столбцов. Но все эти данные должны быть обменены между Python и JVM, и каждый отдельный номер должен обрабатываться Python (который, как известно, медленный для итерации по отдельным элементам данных). Вот как это выглядит:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
Незначительные замечания
Остальные разделы этого бессвязного вопроса - некоторые дополнительные вещи, которые я придумал, пытаясь найти ответ. Большинство людей читают это.
Не решение: используйте Vector
, чтобы начать с
В этом тривиальном примере для начала можно создать данные с использованием векторного типа, но, конечно, мои данные не являются списком Python, который я распараллеливаю, но вместо этого считывается из источника данных. Но для записи, вот как это выглядело бы:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
Неэффективное решение: используйте map()
Одна из возможностей заключается в использовании метода RDD map()
для преобразования списка в Vector
. Это похоже на идею UDF, за исключением того, что ее еще хуже, потому что стоимость сериализации и т.д. Приходится на все поля в каждой строке, а не только на одну из них. Для записи здесь должно выглядеть следующее решение:
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
Неудачная попытка обходного пути для cast
В отчаянии я заметил, что Vector
представляется внутри структурой с четырьмя полями, но использование традиционного приведения из этого типа структуры тоже не работает. Вот иллюстрация (где я построил структуру с помощью udf, но udf не является важной частью):
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
Это дает ошибку:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to [email protected];;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"