Подтвердить что ты не робот

Добавление двух RDD [mllib.linalg.Vector]

Мне нужно добавить две матрицы, которые хранятся в двух файлах.

Содержимое latest1.txt и latest2.txt имеет следующую строку:

1 2 3
4 5 6
7 8 9

Я читаю эти файлы следующим образом:

scala> val rows = sc.textFile("latest1.txt").map { line => val values = line.split(‘ ‘).map(_.toDouble)
    Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}

scala> val r1 = rows
r1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14

scala> val rows = sc.textFile("latest2.txt").map { line => val values = line.split(‘ ‘).map(_.toDouble)
    Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}

scala> val r2 = rows
r2: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14

Я хочу добавить r1, r2. Итак, есть ли способ добавить эти два RDD[mllib.linalg.Vector] в Apache-Spark.

4b9b3361

Ответ 1

Это действительно хороший вопрос. Я регулярно работаю с mllib и не понимаю, что эти основные операции с линейной алгеброй недоступны.

Дело в том, что базовые векторы breeze имеют все манипуляции с линейной алгеброй, которые вы ожидаете, в том числе, конечно, базовое добавление, которое вы конкретно упомянули.

Однако реализация бриза скрыта от внешнего мира через:

[private mllib]

Итак, с точки зрения внешнего мира/публичного API, как мы получаем доступ к этим примитивам?

Некоторые из них уже выставлены: например. сумма квадратов:

/**
 * Returns the squared distance between two Vectors.
 * @param v1 first Vector.
 * @param v2 second Vector.
 * @return squared distance between two Vectors.
 */
def sqdist(v1: Vector, v2: Vector): Double = { 
  ...
}

Однако выбор таких доступных методов ограничен и фактически не включает в себя основные операции, включая элементное добавление, вычитание, умножение и т.д.

Итак, вот лучшее, что я мог видеть:

  • Преобразовать векторы на ветер:
  • Выполнение векторных операций на ветру
  • Преобразование назад с бриза на mllib Vector

Вот пример кода:

val v1 = Vectors.dense(1.0, 2.0, 3.0)
val v2 = Vectors.dense(4.0, 5.0, 6.0)
val bv1 = new DenseVector(v1.toArray)
val bv2 = new DenseVector(v2.toArray)

val vectout = Vectors.dense((bv1 + bv2).toArray)
vectout: org.apache.spark.mllib.linalg.Vector = [5.0,7.0,9.0]

Ответ 2

В следующем коде представлены методы "какBreeze" и "Breeze" из Spark. Это решение поддерживает SparseVector в отличие от использования vector.toArray. Обратите внимание, что Spark может изменить свой API в будущем и уже переименовал toBreeze в asBreeze.

package org.apache.spark.mllib.linalg
import breeze.linalg.{Vector => BV}
import org.apache.spark.sql.functions.udf

/** expose vector.toBreeze and Vectors.fromBreeze
  */
object VectorUtils {

  def fromBreeze(breezeVector: BV[Double]): Vector = {
    Vectors.fromBreeze( breezeVector )
  }

  def asBreeze(vector: Vector): BV[Double] = {
    // this is vector.asBreeze in Spark 2.0
    vector.toBreeze
  }

  val addVectors = udf {
    (v1: Vector, v2: Vector) => fromBreeze( asBreeze(v1) + asBreeze(v2) )
  }

}

С помощью этого можно сделать df.withColumn("xy", addVectors($"x", $"y")).