У меня есть RDD:
1 2 3
4 5 6
7 8 9
Это матрица. Теперь я хочу перенести RDD следующим образом:
1 4 7
2 5 8
3 6 9
Как я могу это сделать?
У меня есть RDD:
1 2 3
4 5 6
7 8 9
Это матрица. Теперь я хочу перенести RDD следующим образом:
1 4 7
2 5 8
3 6 9
Как я могу это сделать?
Скажем, у вас есть матрица N & times; M.
Если оба N и M настолько малы, что вы можете удерживать элементы N и times; M в памяти, нет смысла использовать RDD. Но перенос легко:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)
Если N или M настолько велики, что вы не можете удерживать N или M записей в памяти, то вы не можете иметь RDD-строку такого размера. В этом случае невозможно представить исходную или транспонированную матрицу.
N и M могут иметь средний размер: вы можете хранить N или M записей в памяти, но вы не можете удерживать N и times; M записи. В этом случае вам нужно взорвать матрицу и снова собрать ее:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
case (row, rowIndex) => row.zipWithIndex.map {
case (number, columnIndex) => columnIndex -> (rowIndex, number)
}
}
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
}
Первый черновик без использования collect(), поэтому все работает со стороны рабочего и ничего не делается на драйвере:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position
.map(v => (v._2, v._1)) // key by column position
.groupByKey.sortByKey // regroup on column position, thus all elements from the first column will be in the first row
.map(_._2) // discard the key, keep only value
Проблема с этим решением заключается в том, что столбцы в транспонированной матрице будут перетасованы, если операция выполняется в распределенной системе. Подумайте о улучшенной версии
Моя идея заключается в том, что помимо прикрепления "номера столбца" к каждому элементу матрицы мы также добавляем "номер строки". Таким образом, мы могли бы зайти по позиции столбца и перегруппироваться по ключу, как в примере, но затем мы могли бы изменить порядок каждой строки на номере строки, а затем стянуть строки/столбцы из результата. У меня просто нет способа узнать номер строки при импорте файла в RDD.
Вам может показаться, что тяжело прикрепить столбец и номер строки к каждому матричному элементу, но я думаю, что цена, которую нужно заплатить, имеет возможность обрабатывать ваш вход как куски распределенным способом и таким образом обрабатывать огромные матрицы.
Будет обновлен ответ, когда я найду решение проблемы с упорядочением.
С Spark 1.6 вы можете использовать pivot operation в DataFrames, в зависимости от фактической формы ваших данных, если вы поместите ее в DF вы можете сворачивать столбцы в строки, следующий блог databricks очень полезен, так как он подробно описывает несколько вариантов использования с примерами кода