Подтвердить что ты не робот

Как читать из hbase с использованием искры

Следующий код будет считываться с hbase, а затем преобразовать его в json-структуру и преобразовать в schemaRDD. Но проблема в том, что я using List для хранения json-строки, а затем передаю javaRDD, для данных около 100 ГБ мастер будет загружен данными в память. Каков правильный способ загрузки данных из hbase, затем выполните манипуляции, а затем конвертируйте в JavaRDD.

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

    public static void main(String[] args) throws IOException, ParseException {

        List<String> jars = Lists.newArrayList("");

        SparkConf spconf = new SparkConf();
        spconf.setMaster("local[2]");
        spconf.setAppName("HBase");
        //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
        spconf.setJars(jars.toArray(new String[jars.size()]));
        JavaSparkContext sc = new JavaSparkContext(spconf);
        //spconf.set("spark.executor.memory", "1g");

        JavaSQLContext jsql = new JavaSQLContext(sc);


        HBaseConfiguration conf = new HBaseConfiguration();
        String tableName = "HBase.CounData1_Raw_Min1";
        HTable table = new HTable(conf,tableName);
        try {

            ResultScanner scanner = table.getScanner(new Scan());
            List<String> jsonList = new ArrayList<String>();

            String json = null;

            for(Result rowResult:scanner) {
                json = "";
                String rowKey  = Bytes.toString(rowResult.getRow());
                for(byte[] s1:rowResult.getMap().keySet()) {
                    String s1_str = Bytes.toString(s1);

                    String jsonSame = "";
                    for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
                        String s2_str = Bytes.toString(s2);
                        for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
                            String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
                            jsonSame += "\""+s2_str+"\":"+s3_str+",";
                        }
                    }
                    jsonSame = jsonSame.substring(0,jsonSame.length()-1);
                    json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
                }
                json = json.substring(0,json.length()-1);
                json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
                jsonList.add(json);
            }

            JavaRDD<String> jsonRDD = sc.parallelize(jsonList);

            JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);




            System.out.println(schemaRDD.take(2));

        } finally {
            table.close();
        }

    }

}
4b9b3361

Ответ 1

Основной пример для чтения данных HBase с использованием Spark (Scala). Вы также можете использовать это в Java:

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    val tableName = "table1"

    System.setProperty("user.name", "hdfs")
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    conf.set("hbase.master", "localhost:60000")
    conf.setInt("timeout", 120000)
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found : " + hBaseRDD.count())
    sc.stop()
  }
}

ОБНОВЛЕНО -2016

Начиная с Spark 1.0.x +, теперь вы также можете использовать Spark-HBase Connector:

Зависимость Maven для включения:

<dependency>
  <groupId>it.nerdammer.bigdata</groupId>
  <artifactId>spark-hbase-connector_2.10</artifactId>
  <version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>

И найдите приведенный ниже пример кода для него:

import org.apache.spark._
import it.nerdammer.spark.hbase._

object HBaseRead extends App {
    val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
    sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
    val sc = new SparkContext(sparkConf)

    // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:

    val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
    .select("DocID", "Title").inColumnFamily("SMPL")

    println("Number of Records found : " + docRdd .count())
}

ОБНОВЛЕНО - 2017

Начиная с Spark 1.6.x +, теперь вы также можете использовать SHC Connector (пользователи Hortonworks или HDP):

Зависимость Maven для включения:

    <dependency>
        <groupId>com.hortonworks</groupId>
        <artifactId>shc</artifactId>
        <version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
    </dependency>

Основным преимуществом использования этого разъема является то, что он обладает гибкостью в определении схемы и не нуждается в параметрах Hardcoded, как в разъёме nerdammer/spark-hbase. Также помните, что он поддерживает Spark 2.x, поэтому этот разъем довольно гибкий и обеспечивает сквозную поддержку в разделе "Проблемы и PR".

Найдите следующий путь репозитория для последней версии и образцов:

Коннектор Hortonworks Spark HBase

Вы также можете преобразовать этот RDD в DataFrames и запустить SQL поверх него, или вы можете сопоставить эти Dataset или DataFrames с определенными пользователем классами Java Pojo или Case. Он работает блестяще.

Прошу прокомментировать ниже, если вам нужно что-то еще.

Ответ 2

Я предпочитаю читать из hbase и делать манипуляции json в искры.
Spark предоставляет функцию JavaSparkContext.newAPIHadoopRDD для чтения данных из хранилища хаопов, включая HBase. Вам нужно будет указать конфигурацию HBase, имя таблицы и сканирование в параметре конфигурации и в формате ввода таблицы, а значение ключа

Вы можете использовать класс table input format и его параметр job для предоставления имени таблицы и конфигурации сканирования

Пример:

conf.set(TableInputFormat.INPUT_TABLE, "tablename");
JavaPairRDD<ImmutableBytesWritable, Result> data = 
jsc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

тогда вы можете сделать манипуляцию json в искры. Поскольку искра может выполнять пересчет, когда память заполнена, она загружает только данные, необходимые для части пересчета (cmiiw), поэтому вам не нужно беспокоиться о размере данных

Ответ 3

просто добавьте комментарий о том, как добавить сканирование:

TableInputFormat имеет следующие атрибуты:

  • SCAN_ROW_START
  • SCAN_ROW_STOP
conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey");
conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");

Ответ 4

Поскольку вопрос не является новым, на данный момент существует несколько других альтернатив:

  • hbase-spark, модуль, доступный непосредственно в репозитории HBase
  • Spark-on-HBase от Hortonworks

Я не знаю много о первом проекте, но похоже, что он не поддерживает Spark 2.x. Тем не менее, он имеет богатую поддержку на уровне RDD для Spark 1.6.x.

Spark-on-HBase, с другой стороны, имеет ветки для Spark 2.0 и предстоящего Spark 2.1. Этот проект очень перспективен, поскольку он ориентирован на API Dataset/DataFrame. Под капотом он реализует стандартный API-интерфейс Spark Datasource и использует механизм Spark Catalyst для оптимизации запросов. Разработчики заявляют здесь, что он способен к обрезке разделов, обрезке столбцов, откату предикатов и достижению локализации данных.

Ниже представлен простой пример, в котором используется артефакт com.hortonworks:shc:1.0.0-2.0-s_2.11 из этого repo и Spark 2.0.2:

case class Record(col0: Int, col1: Int, col2: Boolean)

val spark = SparkSession
  .builder()
  .appName("Spark HBase Example")
  .master("local[4]")
  .getOrCreate()

def catalog =
  s"""{
      |"table":{"namespace":"default", "name":"table1"},
      |"rowkey":"key",
      |"columns":{
      |"col0":{"cf":"rowkey", "col":"key", "type":"int"},
      |"col1":{"cf":"cf1", "col":"col1", "type":"int"},
      |"col2":{"cf":"cf2", "col":"col2", "type":"boolean"}
      |}
      |}""".stripMargin

val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0))

// write
spark
  .createDataFrame(artificialData)
  .write
  .option(HBaseTableCatalog.tableCatalog, catalog)
  .option(HBaseTableCatalog.newTable, "5")
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .save()

// read
val df = spark
  .read
  .option(HBaseTableCatalog.tableCatalog, catalog)
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .load()

df.count()