Подтвердить что ты не робот

Требовать сериализацию крио в Spark (Scala)

Я включил сериализацию крио:

conf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )

Я хочу убедиться, что пользовательский класс сериализуется с использованием kryo при перетасовке между узлами. Я могу зарегистрировать класс с помощью kryo следующим образом:

conf.registerKryoClasses(Array(classOf[Foo]))

Как я понимаю, это фактически не гарантирует, что используется сериализация kyro; если сериализатор недоступен, kryo вернется к сериализации Java.

Чтобы гарантировать сериализацию крио, я выполнил эту рекомендацию из документации Spark:

conf.set("spark.kryo.registrationRequired", "true")

Но это вызывает выброс IllegalArugmentException ( "Класс не зарегистрирован" ) для группы разных классов, которые я предполагаю, что Spark использует внутренне, например, следующее:

org.apache.spark.util.collection.CompactBuffer
scala.Tuple3

Конечно, мне не нужно вручную регистрировать каждый из этих отдельных классов с помощью kryo? Эти сериализаторы определены в kryo, так есть ли способ автоматически зарегистрировать их все?

4b9b3361

Ответ 1

Как я понимаю, это фактически не гарантирует, что используется сериализация kyro; если сериализатор недоступен, kryo вернется к сериализации Java.

Нет. Если вы установите spark.serializer на org.apache.spark.serializer. KryoSerializer, то Spark будет использовать Kryo. Если Kryo недоступен, вы получите сообщение об ошибке. Нет возврата.

Итак, что это за регистрация Крио?

Когда Kryo сериализует экземпляр незарегистрированного класса, он должен вывести полное имя класса. Это много персонажей. Вместо этого, если класс был предварительно зарегистрирован, Kryo может просто выводить числовую ссылку на этот класс, который составляет всего 1-2 байта.

Это особенно важно, когда каждая строка RDD сериализуется с помощью Kryo. Вы не хотите включать одно и то же имя класса для каждого из миллиардов строк. Поэтому вы предварительно зарегистрируете эти классы. Но легко забыть зарегистрировать новый класс, а затем вы снова теряете байты. Решение должно требовать регистрации каждого класса:

conf.set("spark.kryo.registrationRequired", "true")

Теперь Kryo никогда не будет выводить имена классов. Если он встречает незарегистрированный класс, это ошибка времени выполнения.

К сожалению, трудно перечислить все классы, которые вы собираетесь сериализовать заранее. Идея заключается в том, что Spark регистрирует классы, специфичные для Spark, и вы регистрируете все остальное. У вас есть RDD[(X, Y, Z)]? Вам необходимо зарегистрировать classOf[scala.Tuple3[_, _, _]].

Список классов классов, которые регистрируются Spark, фактически включает CompactBuffer, поэтому, если вы видите ошибку для этого, вы делаете что-то неправильно, Вы обходите процедуру регистрации Spark. Для регистрации классов вам необходимо использовать spark.kryo.classesToRegister или spark.kryo.registrator. (См. параметры конфигурации. Если вы используете GraphX, ваш регистратор должен называть GraphXUtils. registerKryoClasses.)

Ответ 2

Основываясь на том, что вы видите, лучше всего предположить, что вам не хватает утверждения:

sparkConf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )

В течение последних нескольких дней я также боролся с преобразованием сериализации в Kryo, в том числе для GraphX, включая регистрацию scala.Tuple3 с Kryo, по-видимому, потому что код Spark/GraphX ​​создает Tuple3, когда я делаю "sortBy".

Было добавлено множество других классов, один за другим, чтобы зарегистрироваться для регистрации в Kryo, в основном Scala и классах Spark, которые я бы не думал, что мне нужно будет добавить. Думать/надеяться, что лучше использовать Kryo с Spark.

Ответ 3

У меня есть метод, чтобы получить все имена классов, которые должны быть зарегистрированы быстро.

implicit class FieldExtensions(private val obj: Object) extends AnyVal {
    def readFieldAs[T](fieldName: String): T = {
        FieldUtils.readField(obj, fieldName, true).asInstanceOf[T]
    }

    def writeField(fieldName: String, value: Object): Unit = {
        FieldUtils.writeField(obj, fieldName, value, true)
    }
}

class LogClassResolver extends DefaultClassResolver {

    override def registerImplicit(t: Class[_]): Registration = {
        println(s"registerImplicitclasstype:${t.getName}")

        super.registerImplicit(t)
    }

    def copyFrom(resolver: DefaultClassResolver): Unit = {
        this.kryo = resolver.readFieldAs("kryo")

        this.idToRegistration.putAll(resolver.readFieldAs("idToRegistration"))
        this.classToRegistration.putAll(resolver.readFieldAs("classToRegistration"))
        this.classToNameId = resolver.readFieldAs("classToNameId")
        this.nameIdToClass = resolver.readFieldAs("nameIdToClass")
        this.nameToClass = resolver.readFieldAs("nameToClass")
        this.nextNameId = resolver.readFieldAs("nextNameId")

        this.writeField("memoizedClassId", resolver.readFieldAs("memoizedClassId"))
        this.writeField("memoizedClassIdValue", resolver.readFieldAs("memoizedClassIdValue"))
        this.writeField("memoizedClass", resolver.readFieldAs("memoizedClass"))
        this.writeField("memoizedClassValue", resolver.readFieldAs("memoizedClassValue"))
    }
}

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo): Unit = {
        val newResolver = new LogClassResolver
        newResolver.copyFrom(kryo.getClassResolver.asInstanceOf[DefaultClassResolver])
        FieldUtils.writeField(kryo, "classResolver", newResolver, true)
    }
}

И вам просто нужно зарегистрировать MyRegistrator в спарк-сессии.

val sparkSession = SparkSession.builder()
    .appName("Your_Spark_App")
    .config("spark.kryo.registrator", classOf[MyRegistrator].getTypeName)
    .getOrCreate()
    // all your spark logic will be added here

После этого отправьте небольшое примерное приложение spark в кластер, все имена классов, которые необходимо зарегистрировать, будут напечатаны на стандартный вывод. Затем следующая команда linux получит список имен классов:

yarn logs --applicationId {your_spark_app_id} | grep registerImplicitclasstype >> type_names.txt
sort -u type_names.txt

Затем зарегистрируйте все имя класса в вашем регистраторе: kryo.registser(Class.forName("имя класса"))

После этого вы можете добавить config("spark.kryo.registrationRequired", "true") в конф-конф. Иногда журналы пряжи могут быть потеряны, вы можете снова запустить вышеописанный процесс. PS: приведенный выше код работает для версии 2.1.2.

Наслаждаться.