Подтвердить что ты не робот

Матричное умножение в Apache Spark

Я пытаюсь выполнить матричное умножение с помощью Apache Spark и Java.

У меня есть два основных вопроса:

  • Как создать RDD, который может представлять матрицу в Apache Spark?
  • Как умножить два таких RDD?
4b9b3361

Ответ 1

Все зависит от входных данных и размеров, но, как правило, вы не являетесь RDD, а одной из распределенных структур данных из org.apache.spark.mllib.linalg.distributed. В настоящий момент он предоставляет четыре различные реализации DistributedMatrix

  • IndexedRowMatrix - можно создать непосредственно из RDD[IndexedRow], где IndexedRow состоят из индекса строки и org.apache.spark.mllib.linalg.Vector

    import org.apache.spark.mllib.linalg.{Vectors, Matrices}
    import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix,
      IndexedRow}
    
    val rows =  sc.parallelize(Seq(
      (0L, Array(1.0, 0.0, 0.0)),
      (0L, Array(0.0, 1.0, 0.0)),
      (0L, Array(0.0, 0.0, 1.0)))
    ).map{case (i, xs) => IndexedRow(i, Vectors.dense(xs))}
    
    val indexedRowMatrix = new IndexedRowMatrix(rows)
    
  • RowMatrix - аналогично IndexedRowMatrix, но без значимых индексов строк. Может быть создан непосредственно из RDD[org.apache.spark.mllib.linalg.Vector]

    import org.apache.spark.mllib.linalg.distributed.RowMatrix
    
    val rowMatrix = new RowMatrix(rows.map(_.vector))      
    
  • BlockMatrix - может быть создан из RDD[((Int, Int), Matrix)], где первый элемент кортежа содержит координаты блока, а второй - локальный org.apache.spark.mllib.linalg.Matrix

    val eye = Matrices.sparse(
      3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1, 1, 1))
    
    val blocks = sc.parallelize(Seq(
       ((0, 0), eye), ((1, 1), eye), ((2, 2), eye)))
    
    val blockMatrix = new BlockMatrix(blocks, 3, 3, 9, 9)
    
  • CoordinateMatrix - может быть создан из RDD[MatrixEntry], где MatrixEntry состоит строки, столбца и значения.

    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,
      MatrixEntry}
    
    val entries = sc.parallelize(Seq(
       (0, 0, 3.0), (2, 0, -5.0), (3, 2, 1.0),
       (4, 1, 6.0), (6, 2, 2.0), (8, 1, 4.0))
    ).map{case (i, j, v) => MatrixEntry(i, j, v)}
    
    val coordinateMatrix = new CoordinateMatrix(entries, 9, 3)
    

Первые две реализации поддерживают умножение на локальный Matrix:

val localMatrix = Matrices.dense(3, 2, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))

indexedRowMatrix.multiply(localMatrix).rows.collect
// Array(IndexedRow(0,[1.0,4.0]), IndexedRow(0,[2.0,5.0]),
//   IndexedRow(0,[3.0,6.0]))

а третий может быть умножен на другой BlockMatrix, если количество столбцов на блок в этой матрице соответствует числу строк на блок другой матрицы. CoordinateMatrix не поддерживает умножения, но довольно легко создать и преобразовать в другие типы распределенных матриц:

blockMatrix.multiply(coordinateMatrix.toBlockMatrix(3, 3))

Каждый тип имеет свои сильные и слабые стороны, и есть некоторые дополнительные факторы, которые следует учитывать при использовании разреженных или плотных элементов (Vectors или block Matrices). Умножение на локальную матрицу обычно предпочтительнее, поскольку она не требует дорогого перетасовки.

Более подробную информацию о каждом типе в руководстве по типам данных MLlib.