Подтвердить что ты не робот

Как назначить уникальные смежные числа элементам в Spark RDD

У меня есть набор данных (user, product, review), и я хочу передать его в алгоритм ALS mllib.

Алгоритм требует, чтобы пользователи и продукты были числами, а мои - именами строк и строковыми SKU.

Сейчас я получаю отдельных пользователей и SKU, а затем присваиваю им числовые идентификаторы вне Spark.

Мне было интересно, есть ли лучший способ сделать это. Один из подходов, о котором я думал, заключается в написании настраиваемого RDD, который по существу перечисляет от 1 до n, а затем вызывает zip на двух RDD.

4b9b3361

Ответ 1

Начиная с Spark 1.0 есть два метода, которые вы можете легко решить:

  • RDD.zipWithIndex похож на Seq.zipWithIndex, он добавляет смежные (Long) числа. Это нужно сначала подсчитать элементы в каждом разделе, поэтому ваш ввод будет оценен дважды. Кэш вашего входного RDD, если вы хотите использовать это.
  • RDD.zipWithUniqueId также дает вам уникальные идентификаторы Long, но они не гарантированно смежны. (Они будут только смежными, если каждый раздел имеет одинаковое количество элементов.) Потенциал заключается в том, что это не нужно ничего знать о вводе, поэтому он не будет вызывать двойную оценку.

Ответ 2

Для аналогичного примера использования, я просто хэшировал строковые значения. См. http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/

def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))

Похоже, вы уже делаете что-то подобное, хотя хэширование может быть проще в управлении.

Matei предложил здесь подход к эмуляции zipWithIndex на RDD, что сводится к назначению идентификаторов внутри каждой части, которые будут глобально уникальными: https://groups.google.com/forum/#!topic/spark-users/WxXvcn2gl1E

Ответ 3

Другим простым вариантом, если использовать DataFrames и просто обеспокоен уникальностью, является использование функции MonotonicallyIncreasingID

import org.apache.spark.sql.functions.monotonicallyIncreasingId 
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)

Изменить: MonotonicallyIncreasingID устарел и удален начиная с Spark 2.0; он теперь известен как monotonically_increasing_id.

Ответ 4

monotonically_increasing_id() представляется ответом, но, к сожалению, он не будет работать для ALS, поскольку он производит 64-битные номера, и ALS ожидает 32 (см. мой комментарий ниже radek1st ответ для deets).

Решение, которое я нашел, заключается в использовании zipWithIndex(), как указано в ответе Дарабоса. Вот как это реализовать:

Если у вас уже есть один столбцовый DataFrame с вашими явными пользователями с именем userids, вы можете создать таблицу поиска (LUT) следующим образом:

# PySpark code
user_als_id_LUT = sqlContext.createDataFrame(userids.rdd.map(lambda x: x[0]).zipWithIndex(), StructType([StructField("userid", StringType(), True),StructField("user_als_id", IntegerType(), True)]))

Теперь вы можете:

  • Используйте этот LUT для получения ALS-дружественных идентификаторов целого числа, чтобы предоставить ALS
  • Используйте этот LUT для обратного поиска, когда вам нужно вернуться с идентификатора ALS к исходному идентификатору

Сделайте то же самое для элементов.

Ответ 5

Люди уже рекомендовали monotonically_increasing_id() и упомянули проблему, что он создает Longs, а не Ints.

Однако, по моему опыту (caveat - Spark 1.6) - если вы используете его на одном исполнителе (перераспределение до 1 раньше), нет префикса исполнителя, и его можно безопасно перевести в Int. Очевидно, что вам нужно иметь меньше строк Integer.MAX_VALUE.