Подтвердить что ты не робот

Решение проблем зависимостей в Apache Spark

Общие проблемы при создании и развертывании приложений Spark:

  • java.lang.ClassNotFoundException.
  • object x is not a member of package y ошибки компиляции.
  • java.lang.NoSuchMethodError

Как это можно решить?

4b9b3361

Ответ 1

При создании и развертывании приложений Spark все зависимости требуют совместимых версий.

  • Scala версия. Все пакеты должны использовать те же основные версии (2.10, 2.11, 2.12) Scala.

    Рассмотрим следующий (неверный) build.sbt:

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    Мы используем spark-streaming для Scala 2.10, а остальные пакеты для Scala 2.11. Файл действительный может быть

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    но лучше указать версию по всему миру и использовать %%:

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.11.7"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" %% "spark-core" % "2.0.1",
       "org.apache.spark" %% "spark-streaming" % "2.0.1",
       "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
    )
    

    Аналогично в Maven:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
    
  • Исправленная версия. Все пакеты должны использовать ту же самую основную версию Spark (1.6, 2.0, 2.1,...).

    Рассмотрим следующий (неверный) файл build.sbt:

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "1.6.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    Мы используем spark-core 1.6, а остальные компоненты - в Spark 2.0. Файл действительный может быть

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    но лучше использовать переменную:

    name := "Simple Project"
    
    version := "1.0"
    
    val sparkVersion = "2.0.1"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % sparkVersion,
       "org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
    )
    

    Аналогично в Maven:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
        <scala.version>2.11</scala.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
    
  • Версия Spark, используемая в зависимостях Spark, должна соответствовать версии Spark установки Spark. Например, если вы используете 1.6.1 в кластере, вы должны использовать 1.6.1 для сборки банок. Несоответствие незначительных версий не всегда принимается.

  • Scala версия, используемая для сборки jar, должна соответствовать версии Scala, используемой для создания развернутого Spark. По умолчанию (загружаемые двоичные файлы и сборки по умолчанию):

    • Искра 1.x → Scala 2.10
    • Искра 2.x → Scala 2.11
  • Дополнительные пакеты должны быть доступны на рабочих узлах, если они включены в жировую банку. Существует множество опций, включая:

    • --jars аргумент для spark-submit - для распространения локальных файлов jar.
    • --packages аргумент для spark-submit - для извлечения зависимостей из репозитория Maven.

    При отправке в кластер node вы должны включить приложение jar в --jars.

Ответ 2

Путь классов Apache Spark построен динамически (для размещения кода пользователя для каждого приложения), что делает его уязвимым для таких проблем. @user7337271 ответ правильный, но есть еще несколько проблем, в зависимости от используемого вами менеджера кластера ("мастер").

Во-первых, приложение Spark состоит из этих компонентов (каждый из них является отдельным JVM, поэтому потенциально содержит разные классы в его пути к классам):

  1. Драйвер: ваше приложение, создающее SparkSession (или SparkContext) и подключение к диспетчеру кластера для выполнения фактической работы
  2. Диспетчер кластеров: служит в качестве "точки входа" в кластер, отвечающий за распределение исполнителей для каждого приложения. В Spark есть несколько различных типов: автономный, YARN и Mesos, о которых мы расскажем ниже.
  3. Исполнители: это процессы на узлах кластера, выполнение фактической работы (запуск задач Spark)

Соотношение между ними описано на этой диаграмме из обзора режима кластера Apache Spark:

Cluster Mode Overview

Теперь - какие классы должны находиться в каждом из этих компонентов?

На это можно ответить на следующую диаграмму:

Class placement overview

Пусть разберётся так медленно:

  1. Spark Code - это библиотеки Spark. Они должны существовать во всех трех компонентах, так как они включают в себя клей, который позволяет Spark выполнять связь между ними. Кстати, авторы Spark внесли дизайнерское решение включить код для ВСЕХ компонентов во ВСЕХ компонентах (например, включить код, который должен работать только в Executor в драйвере тоже), чтобы упростить это - так что Spark "fat jar" (в версиях до 1.6 ) или "архив" (в версии 2.0, ниже) содержится необходимый код для всех компонентов и должен быть доступен во всех них.

  2. Код только для драйверов - это код пользователя, который не содержит ничего, что должно использоваться для исполнителей, то есть код, который не используется при каких-либо преобразованиях в RDD/DataFrame/Dataset. Это необязательно должно быть отделено от распределенного кода пользователя, но это может быть.

  3. Распределенный код - это код пользователя, который скомпилирован с кодом драйвера, но также должен выполняться на исполнителях - все используемые фактические преобразования должны быть включены в эту банку.

Теперь, когда мы получили это прямо, как мы получаем, чтобы классы правильно загружались в каждом компоненте, и какими правилами они должны следовать?

  1. Код искры: как и в предыдущих ответах, вы должны использовать те же версии Scala и Spark для всех компонентов.

    1.1 В автономном режиме существует "ранее существовавшая" установка Spark, к которой могут подключаться приложения (драйверы). Это означает, что все драйверы должны использовать ту же версию Spark, что и на главном и исполнительном устройствах.

    1.2 В YARN/Mesos каждое приложение может использовать другую версию Spark, но все компоненты одного и того же приложения должны использовать один и тот же. Это означает, что если вы использовали версию X для компиляции и упаковки вашего приложения драйвера, вы должны предоставить ту же версию при запуске SparkSession (например, через параметры spark.yarn.archive или spark.yarn.jars при использовании YARN). Банки/архив, которые вы предоставляете, должны включать все зависимости Spark (включая транзитивные зависимости), и он будет отправлен менеджером кластера каждому исполнителю при запуске приложения.

  2. Код драйвера: этот код полностью может быть отправлен в виде кучи банок или "толстой банки", если он включает все зависимости Spark + весь код пользователя

  3. Распределенный код: помимо присутствия в драйвере этот код должен быть отправлен исполнителям (опять же, вместе со всеми его транзитивными зависимостями). Это делается с использованием параметра spark.jars.

Резюмируя, здесь предлагается подход к созданию и развертыванию Spark Application (в данном случае - с использованием YARN):

  • Создайте библиотеку с вашим распределенным кодом, упакуйте ее как "обычную" банку (с файлом.pom, описывающим ее зависимости), так и как "живую банку" (со всеми включенными транзитными зависимостями).
  • Создайте приложение драйвера с зависимостями компиляции от вашей библиотеки распределенных кодов и от Apache Spark (с определенной версией)
  • Упакуйте приложение драйвера в толстую банку, которая будет развернута водителю
  • Передайте правильную версию вашего распределенного кода как значение параметра spark.jars при запуске SparkSession
  • Передайте местоположение файла архива (например, gzip), содержащего все банки в папке lib/ из загруженных двоичных файлов Spark, как значение spark.yarn.archive

Ответ 3

В дополнение к очень обширному ответу, уже заданному пользователем7337271, если проблема возникает из-за отсутствия внешних зависимостей, вы можете создать банку с вашими зависимостями, например. плагин сборки maven

В этом случае убедитесь, что все основные искровые зависимости отмечены как "предоставленные" в вашей системе сборки, и, как уже отмечалось, убедитесь, что они коррелируют с вашей версией искробезопасности.

Ответ 4

Уровни зависимости вашего приложения должны указываться в опции application-jar вашей команды запуска.

Более подробную информацию можно найти в Документация по искры

Взято из документации:

application-jar: путь к объединенной банке, включая ваше приложение и все зависимости. URL должен быть глобально видимым внутри вашего кластер, например, путь hdfs://или путь к файлу://, который является присутствует на всех узлах

Ответ 5

Я думаю, что эта проблема должна решить плагин сборки. Вам нужно построить жирную банку. Например, в sbt:

  • добавить файл $PROJECT_ROOT/project/assembly.sbt с кодом addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
  • to build.sbt added some libraries libraryDependencies ++ = Seq ("com.some.company" %% "some-lib"% "1.0.0") '
  • в консоли sbt введите "сборка" и разверните сборную банку

Если вам нужна дополнительная информация, перейдите на страницу https://github.com/sbt/sbt-assembly

Ответ 6

Добавьте в проект все файлы jar из spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\jars. Спарк-2.4.0-bin-hadoop2.7 можно загрузить с https://spark.apache.org/downloads.html.