Подтвердить что ты не робот

Обработка Xml в Spark

Сценарий: Мои входные данные будут представлять собой несколько небольших XML-данных и, как предполагается, читать эти XML файлы в виде RDD. Выполните соединение с другим набором данных и сформируйте RDD и отправьте вывод как XML.

Можно ли читать XML с использованием искры, загружать данные как RDD? Если возможно, как будет читаться XML.

Пример XML:

<root>
    <users>
        <user>
              <account>1234<\account>
              <name>name_1<\name>
              <number>34233<\number>
         <\user>
         <user>
              <account>58789<\account>
              <name>name_2<\name>
              <number>54697<\number>
         <\user>    
    <\users>
<\root>

Как это будет загружено в RDD?

4b9b3361

Ответ 1

Да, это возможно, но детали будут отличаться в зависимости от выбранного вами подхода.

  • Если файлы имеют небольшой размер, как вы упомянули, самое простое решение - загрузить ваши данные, используя SparkContext.wholeTextFiles. Он загружает данные как RDD[(String, String)], где первый элемент - это путь, а второй - содержимое файла. Затем вы анализируете каждый файл по отдельности, как в локальном режиме.
  • Для больших файлов вы можете использовать форматы ввода Hadoop.
    • Если структура проста, вы можете разделить записи, используя textinputformat.record.delimiter. Вы можете найти простой пример здесь. Ввод - это не XML, а то, что вы должны дать, и идея, как действовать
    • В противном случае Mahout предоставляет XmlInputFormat
  • Наконец, можно прочитать файл, используя SparkContext.textFile, и позже настроить его на охват записи между разделами. Концептуально это означает нечто подобное созданию скользящего окна или разбиения записей на группы фиксированного размера:

    • использовать разделы mapPartitionsWithIndex для выявления разорванных записей, собирать разорванные записи
    • использовать вторую mapPartitionsWithIndex для восстановления испорченных записей

Редактирование:

Существует также относительно новый пакет spark-xml, который позволяет извлекать определенные записи по тегу:

val df = sqlContext.read
  .format("com.databricks.spark.xml")
   .option("rowTag", "foo")
   .load("bar.xml")

Ответ 2

Вот способ выполнить это, используя HadoopInputFormats для чтения XML-данных в искре, как это объясняется @zero323.

Входные данные:

<root>
    <users>
        <user>
            <account>1234<\account>
            <name>name_1<\name>
            <number>34233<\number>
        <\user>
        <user>
            <account>58789<\account>
            <name>name_2<\name>
            <number>54697<\number>
        <\user>
    <\users>
<\root>

Код для чтения ввода XML:

По этой ссылке

вы получите несколько баночекИмпорт:

//---------------spark_import
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext

//----------------xml_loader_import
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{ LongWritable, Text }
import com.cloudera.datascience.common.XmlInputFormat

Код:

object Tester_loader {
  case class User(account: String, name: String, number: String)
  def main(args: Array[String]): Unit = {

    val sparkHome = "/usr/big_data_tools/spark-1.5.0-bin-hadoop2.6/"
    val sparkMasterUrl = "spark://SYSTEMX:7077"

    var jars = new Array[String](3)

    jars(0) = "/home/hduser/Offload_Data_Warehouse_Spark.jar"
    jars(1) = "/usr/big_data_tools/JARS/Spark_jar/avro/spark-avro_2.10-2.0.1.jar"

    val conf = new SparkConf().setAppName("XML Reading")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local")
      .set("spark.cassandra.connection.host", "127.0.0.1")
      .setSparkHome(sparkHome)
      .set("spark.executor.memory", "512m")
      .set("spark.default.deployCores", "12")
      .set("spark.cores.max", "12")
      .setJars(jars)

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // ---- loading user from XML

    // calling function 1.1
    val pages = readFile("src/input_data", "<user>", "<\\user>", sc) 

    val xmlUserDF = pages.map { tuple =>
      {
        val account = extractField(tuple, "account")
        val name = extractField(tuple, "name")
        val number = extractField(tuple, "number")

        User(account, name, number)
      }
    }.toDF()
    println(xmlUserDF.count())
    xmlUserDF.show()
  }

Функции:

  def readFile(path: String, start_tag: String, end_tag: String,
      sc: SparkContext) = {

    val conf = new Configuration()
    conf.set(XmlInputFormat.START_TAG_KEY, start_tag)
    conf.set(XmlInputFormat.END_TAG_KEY, end_tag)
    val rawXmls = sc.newAPIHadoopFile(
        path, classOf[XmlInputFormat], classOf[LongWritable],
        classOf[Text], conf)

    rawXmls.map(p => p._2.toString)
  }

  def extractField(tuple: String, tag: String) = {
    var value = tuple.replaceAll("\n", " ").replace("<\\", "</")

    if (value.contains("<" + tag + ">") &&
        value.contains("</" + tag + ">")) {
      value = value.split("<" + tag + ">")(1).split("</" + tag + ">")(0)
    }
    value
  }

}

Выход:

+-------+------+------+
|account|  name|number|
+-------+------+------+
|   1234|name_1| 34233|
|  58789|name_2| 54697|
+-------+------+------+

Полученный результат находится в кадрах данных, которые можно преобразовать в RDD согласно вашему требованию, например this->

val xmlUserRDD = xmlUserDF.toJavaRDD.rdd.map { x =>
    (x.get(0).toString(),x.get(1).toString(),x.get(2).toString()) }

Пожалуйста, оцените это, если это может помочь вам, как.

Ответ 3

Это поможет вам.

package packagename;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import com.databricks.spark.xml.XmlReader;

public class XmlreaderSpark {
    public static void main(String arr[]){
    String localxml="file path";
    String booksFileTag = "user";

    String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse";
   System.out.println("warehouseLocation" + warehouseLocation);
    SparkSession spark = SparkSession
              .builder()
              .master("local")
              .appName("Java Spark SQL Example")
              .config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", warehouseLocation)
              .enableHiveSupport().config("set spark.sql.crossJoin.enabled", "true")
              .getOrCreate();
    SQLContext sqlContext = new SQLContext(spark);

    Dataset<Row> df = (new XmlReader()).withRowTag(booksFileTag).xmlFile(sqlContext, localxml);
    df.show();

    }
}

Вам нужно добавить эту зависимость в свой POM.xml:

<dependency>
   <groupId>com.databricks</groupId>
   <artifactId>spark-xml_2.10</artifactId>
   <version>0.4.0</version>
</dependency>

и ваш входной файл не в правильном формате.

Спасибо.

Ответ 4

Есть два хороших варианта для простых случаев:

  • wholeTextFiles. Используйте метод сопоставления с вашим парсером XML, который может быть Scala XML pull parser (быстрее кода) или SAX Pull Parser (лучшая производительность).
  • Hadoop streaming XMLInputFormat, который вы должны определить начальный и конечный тег <user> </user> для его обработки, однако он создает один раздел для каждого пользовательского тега
  • spark-xml package также является хорошим вариантом.

Со всеми параметрами вы ограничены только обработкой простых XML-данных, которые могут быть интерпретированы как набор данных с строками и столбцами.

Однако, если мы сделаем его немного сложным, эти параметры не будут полезны.

Например, если у вас есть еще один объект:

<root>
    <users>
    <user>...</users>
    <companies>
    <company>...</companies>
</root>

Теперь вам нужно сгенерировать 2 RDD и изменить ваш синтаксический анализатор, чтобы распознать тег <company>.

Это простой случай, но XML может быть намного сложнее, и вам нужно будет добавить все больше изменений.

Чтобы решить эту сложность, мы построили Flexter поверх Apache Spark, чтобы избавиться от обработки XML файлов на Spark. Я также рекомендую прочитать преобразование XML в Spark в Parquet. В последнем сообщении также содержатся некоторые примеры кода, которые показывают, как вывод может быть запрошен с помощью SparkSQL.

Отказ от ответственности: я работаю для Sonra