Подтвердить что ты не робот

Spark - Задача не сериализуема: как работать со сложными закрытиями карт, вызывающими внешние классы/объекты?

Взгляните на этот вопрос: Scala + Spark - Задача не сериализуема: java.io.NotSerializableExceptionon. При вызове функции снаружи закрывается только для классов, а не объектов.

Проблема:

Предположим, что мои mappers могут быть функциями (def), которые внутренне вызывают другие классы и создают объекты и делают разные вещи внутри. (Или они могут даже быть классами, которые расширяют (Foo) = > Bar, и выполняют обработку в методе их применения, но пусть теперь игнорируют этот случай)

Spark поддерживает только Java Serialization для закрытий. Есть ли выход из этого? Можем ли мы использовать что-то вместо закрытия, чтобы делать то, что я хочу сделать? Мы можем легко делать такие вещи с Hadoop. Это единственное, что делает Искра почти непригодной для меня. Нельзя ожидать, что все сторонние библиотеки будут иметь все классы, расширяющие Serializable!

Возможные решения:

Кажется, что-то похожее на это: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Конечно, похоже, что оболочка - это ответ, но я не вижу точно, как это сделать.

4b9b3361

Ответ 1

Я понял, как это сделать сам!

Вам просто нужно сериализовать объекты перед прохождением через закрытие и затем де-сериализовать. Этот подход просто работает, даже если ваши классы не являются Serializable, потому что он использует Kryo за кулисами. Все, что вам нужно, это карри.;)

Вот пример того, как я это сделал:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

Не стесняйтесь сделать Blah настолько сложным, насколько вы хотите, класс, сопутствующий объект, вложенные классы, ссылки на несколько сторонних библиотек.

KryoSerializationWrapper ссылается на: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Ответ 2

В случае использования Java API вы должны избегать анонимного класса при переходе к закрытию функции сопоставления. Вместо того, чтобы делать карту (новая функция), вам нужен класс, который расширяет вашу функцию и передает ее на карту (..) Видеть: https://yanago.wordpress.com/2015/03/21/apache-spark/