Подтвердить что ты не робот

Как перенести RDD в Spark

У меня есть RDD:

1 2 3
4 5 6
7 8 9

Это матрица. Теперь я хочу перенести RDD следующим образом:

1 4 7
2 5 8
3 6 9

Как я могу это сделать?

4b9b3361

Ответ 1

Скажем, у вас есть матрица N & times; M.

Если оба N и M настолько малы, что вы можете удерживать элементы N и times; M в памяти, нет смысла использовать RDD. Но перенос легко:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)

Если N или M настолько велики, что вы не можете удерживать N или M записей в памяти, то вы не можете иметь RDD-строку такого размера. В этом случае невозможно представить исходную или транспонированную матрицу.

N и M могут иметь средний размер: вы можете хранить N или M записей в памяти, но вы не можете удерживать N и times; M записи. В этом случае вам нужно взорвать матрицу и снова собрать ее:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
  case (row, rowIndex) => row.zipWithIndex.map {
    case (number, columnIndex) => columnIndex -> (rowIndex, number)
  }
}
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
  indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
}

Ответ 2

Первый черновик без использования collect(), поэтому все работает со стороны рабочего и ничего не делается на драйвере:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))

rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position
   .map(v => (v._2, v._1)) // key by column position
   .groupByKey.sortByKey   // regroup on column position, thus all elements from the first column will be in the first row
   .map(_._2)              // discard the key, keep only value

Проблема с этим решением заключается в том, что столбцы в транспонированной матрице будут перетасованы, если операция выполняется в распределенной системе. Подумайте о улучшенной версии

Моя идея заключается в том, что помимо прикрепления "номера столбца" к каждому элементу матрицы мы также добавляем "номер строки". Таким образом, мы могли бы зайти по позиции столбца и перегруппироваться по ключу, как в примере, но затем мы могли бы изменить порядок каждой строки на номере строки, а затем стянуть строки/столбцы из результата. У меня просто нет способа узнать номер строки при импорте файла в RDD.

Вам может показаться, что тяжело прикрепить столбец и номер строки к каждому матричному элементу, но я думаю, что цена, которую нужно заплатить, имеет возможность обрабатывать ваш вход как куски распределенным способом и таким образом обрабатывать огромные матрицы.

Будет обновлен ответ, когда я найду решение проблемы с упорядочением.

Ответ 3

С Spark 1.6 вы можете использовать pivot operation в DataFrames, в зависимости от фактической формы ваших данных, если вы поместите ее в DF вы можете сворачивать столбцы в строки, следующий блог databricks очень полезен, так как он подробно описывает несколько вариантов использования с примерами кода