Подтвердить что ты не робот

Как загрузить Spark Cassandra Connector в оболочку?

Я пытаюсь использовать Spark Cassandra Connector в Spark 1.1.0.

Я успешно создал файл jar из ведущей ветки на GitHub и получил включенные демоверсии. Однако, когда я пытаюсь загрузить файлы jar в spark-shell, я не могу импортировать ни один из классов из пакета com.datastax.spark.connector.

Я попытался использовать параметр --jars на spark-shell и добавил каталог с файлом jar в Java CLASSPATH. Ни один из этих вариантов не работает. Фактически, когда я использую параметр --jars, вывод журнала показывает, что барабан Datastax загружается, но я все еще ничего не могу импортировать из com.datastax.

Мне удалось загрузить соединитель Tuplejump Calliope Cassandra в spark-shell с помощью --jars, поэтому я знаю, что работаю. Это просто разъем Datastax, который не работает для меня.

4b9b3361

Ответ 1

Я понял. Вот что я сделал:

$ git clone https://github.com/datastax/spark-cassandra-connector.git
$ cd spark-cassandra-connector
$ sbt/sbt assembly
$ $SPARK_HOME/bin/spark-shell --jars ~/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/connector-assembly-1.2.0-SNAPSHOT.jar 

В приглашении scala

scala> sc.stop
scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._
scala> import org.apache.spark.SparkConf
scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "my cassandra host")
scala> val sc = new SparkContext("spark://spark host:7077", "test", conf)

Ответ 2

Изменить: теперь немного легче

Для углубленных инструкций проверьте веб-сайт проекта https://github.com/datastax/spark-cassandra-connector/blob/master/doc/13_spark_shell.md

Или не используйте Spark-Packages для загрузки библиотеки (не все версии опубликованы) http://spark-packages.org/package/datastax/spark-cassandra-connector

> $SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M3-s_2.10

Предполагается, что вы работаете с OSS Apache C *

Вам нужно запустить класс с набором -driver-class-path, чтобы включить все ваши соединительные библиотеки

Я приведу сообщение в блоге из знаменитого Аль Тоби

Самый простой способ найти Ive - это установить путь к классу с помощью перезапустите контекст в REPL с необходимыми классами, импортированными в сделать sc.cassandraTable() видимым. Новые загруженные методы не будут отображаться при завершении табуляции. Я не знаю, почему.

  /opt/spark/bin/spark-shell --driver-class-path $(echo /path/to/connector/*.jar |sed 's/ /:/g')

Он выведет пустую информацию журнала, а затем представит приглашение scala > .

scala> sc.stop

Теперь, когда контекст остановлен, его время для импорта соединителя.

scala> import com.datastax.spark.connector._
scala> val conf = new SparkConf()
scala> conf.set("cassandra.connection.host", "node1.pc.datastax.com")
scala> val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
scala> val table = sc.cassandraTable("keyspace", "table")
scala> table.count

Если вы работаете с DSE < 4.5.1

Существует небольшая проблема с DSE Classloader и предыдущими соглашениями об именах пакетов, которые не позволят вам найти новые библиотеки искрового коннектора. Вы можете обойти это, удалив строку, указывающую загрузчик класса DSE, в скриптах, запускающих искровую оболочку.

Ответ 3

Если вы хотите избежать остановки/начала контекста в оболочке, вы также можете добавить его в свои свойства искры в:

{spark_install}/conf/spark-defaults.conf

spark.cassandra.connection.host=192.168.10.10

Ответ 4

Чтобы получить доступ к Cassandra из искровой оболочки, я построил сборку из искрового драйвера cassandra со всеми зависимостями ( "uberjar" ). Предоставляя его искровой оболочке с помощью опции -jars, например:

spark-shell --jars spark-cassandra-assembly-1.0.0-SNAPSHOT-jar-with-dependencies.jar

Я столкнулся с той же проблемой, описанной здесь, и этот метод является простым и удобным (вместо загрузки длинного списка зависимостей)

Я создал сущность с файлом POM, который вы можете скачать. Используя pom для создания uberjar, вы должны сделать:

mvn package

Если вы используете sbt, загляните в плагин sbt-assembly.

Ответ 5

Следующие шаги описывают, как настроить сервер как с помощью Spark Node, так и с Cassandra Node.

Настройка Искры с открытым исходным кодом

Это предполагает, что у вас уже есть настройка Cassandra.

Шаг 1: Загрузка и настройка Spark

Go to http://spark.apache.org/downloads.html.

a) Чтобы сделать вещи простыми, мы будем использовать один из готовых пакетов Spark. Выберите Spark версии 2.0.0 и предварительно построенный для Hadoop 2.7, затем Direct Download. Это загрузит архив со встроенными двоичными файлами для Spark.

b) Извлеките это в каталог по вашему выбору. Я поставлю свое в ~/apps/spark-1.2

c) Test Spark работает, открыв оболочку

Шаг 2: Проверьте, что Spark Works

a) cd в каталог Spark Запустите "./bin/spark-shell". Это откроет интерактивную программу оболочки Spark

b) Если все сработало, оно должно отобразить это приглашение: "scala > "

Запустите простой расчет:

sc.parallelize(от 1 до 50).sum(+) который должен выводить 1250.

c) Поздравляем Спарк работает! Выйдите из оболочки Spark с командой "exit"

Разъем Spark Cassandra

Чтобы подключить Spark к кластеру Cassandra, соединитель Cassandra нужно будет добавить в проект Spark. DataStax предоставляет собственный коннектор Cassandra на GitHub, и мы будем использовать его.

  • Клонировать хранилище коннекторов Spark Cassandra:

    https://github.com/datastax/spark-cassandra-connector

  • cd в "искро-кассандра-коннектор" Постройте коннектор Spark Cassandra выполнив команду

    ./sbt/sbt Dscala-2.11 = сборка истинности

Это должно выводить скомпилированные файлы jar в каталог с именем "target". Будут два файла jar, один для Scala и один для Java. Мы заинтересованы в следующем: "spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar" для Scala. Переместите файл jar в удобный каталог: я поместил свой файл в ~/apps/spark-1.2/jars

Чтобы загрузить соединитель в оболочку искры:

запустите оболочку с помощью этой команды:

../bin/spark-shell -jars ~/Приложения/искровой 1.2/банки/искровая Cassandra-разъем сборка-1.1.1-SNAPSHOT.jar

Соедините контекст искры с кластером Cassandra и остановите контекст по умолчанию:

sc.stop

Импортируйте необходимые файлы jar:

import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf

Создайте новый SparkConf с деталями соединения Cassandra:

val conf = новый SparkConf (true).set( "spark.cassandra.connection.host", "Локальный" )

Создайте новый контекст искры:

val sc = новый SparkContext (conf)

Теперь у вас есть новый SparkContext, который связан с вашим кластером Cassandra.

Ответ 6

Spark-Cassandra-Connector Полный код в JAVA с окном-7,8,10 Полезно.

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import spark_conn.Spark_connection;
import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;
import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App implements Serializable
{
    private transient SparkConf conf;

    private App(SparkConf conf) {
        this.conf = conf;
    }

    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        generateData(sc);
        compute(sc);
        showResults(sc);
        sc.stop();
    }

    private void generateData(JavaSparkContext sc) {
    CassandraConnector connector =   CassandraConnector.apply(sc.getConf());

        // Prepare the schema
   try{ 
   Session session=connector.openSession();
   session.execute("DROP KEYSPACE IF EXISTS java_api");
   session.execute("CREATE KEYSPACE java_api WITH 
   replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
   session.execute("CREATE TABLE java_api.products 
   (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
   session.execute("CREATE TABLE java_api.sales 
   (id UUID PRIMARY KEY,  product INT, price DECIMAL)");
   session.execute("CREATE TABLE java_api.summaries 
   (product INT PRIMARY KEY, summary DECIMAL)");
  }catch(Exception e){System.out.println(e);}

        // Prepare the products hierarchy
   List<Product> products = Arrays.asList(
   new Product(0, "All products", Collections.<Integer>emptyList()),
                new Product(1, "Product A", Arrays.asList(0)),
                new Product(4, "Product A1", Arrays.asList(0, 1)),
                new Product(5, "Product A2", Arrays.asList(0, 1)),
                new Product(2, "Product B", Arrays.asList(0)),
                new Product(6, "Product B1", Arrays.asList(0, 2)),
                new Product(7, "Product B2", Arrays.asList(0, 2)),
                new Product(3, "Product C", Arrays.asList(0)),
                new Product(8, "Product C1", Arrays.asList(0, 3)),
                new Product(9, "Product C2", Arrays.asList(0, 3))
    );

   JavaRDD<Product> productsRDD = sc.parallelize(products);
   javaFunctions(productsRDD, Product.class).
   saveToCassandra("java_api", "products");

   JavaRDD<Sale> salesRDD = productsRDD.filter
   (new Function<Product, Boolean>() {
            @Override
            public Boolean call(Product product) throws Exception {
                return product.getParents().size() == 2;
            }
        }).flatMap(new FlatMapFunction<Product, Sale>() {
            @Override
            public Iterable<Sale> call(Product product) throws Exception {
                Random random = new Random();
                List<Sale> sales = new ArrayList<>(1000);
                for (int i = 0; i < 1000; i++) {
                  sales.add(new Sale(UUID.randomUUID(), 
                 product.getId(), BigDecimal.valueOf(random.nextDouble())));
                }
                return sales;
            }
        });

      javaFunctions(salesRDD, Sale.class).saveToCassandra
      ("java_api", "sales");
    }

    private void compute(JavaSparkContext sc) {
        JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)
                .cassandraTable("java_api", "products", Product.class)
                .keyBy(new Function<Product, Integer>() {
                    @Override
                    public Integer call(Product product) throws Exception {
                        return product.getId();
                    }
                });

        JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)
                .cassandraTable("java_api", "sales", Sale.class)
                .keyBy(new Function<Sale, Integer>() {
                    @Override
                    public Integer call(Sale sale) throws Exception {
                        return sale.getProduct();
                    }
                });

        JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);

        JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {
            @Override
            public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {
                Tuple2<Sale, Product> saleWithProduct = input._2();
                List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);
                allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));
                for (Integer parentProduct : saleWithProduct._2().getParents()) {
                    allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));
                }
                return allSales;
            }
        });

        JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {
            @Override
            public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {
                return v1.add(v2);
            }
        }).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
            @Override
            public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
                return new Summary(input._1(), input._2());
            }
        });

        javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries");
    }

    private void showResults(JavaSparkContext sc) {
        JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
                .cassandraTable("java_api", "summaries", Summary.class)
                .keyBy(new Function<Summary, Integer>() {
                    @Override
                    public Integer call(Summary summary) throws Exception {
                        return summary.getProduct();
                    }
                });

        JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
                .cassandraTable("java_api", "products", Product.class)
                .keyBy(new Function<Product, Integer>() {
                    @Override
                    public Integer call(Product product) throws Exception {
                        return product.getId();
                    }
                });

        List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();

        for (Tuple2<Product, Optional<Summary>> result : results) {
            System.out.println(result);
        }
    }

    public static void main(String[] args) {
//        if (args.length != 2) {
//            System.err.println("Syntax: com.datastax.spark.demo.App <Spark Master URL> <Cassandra contact point>");
//            System.exit(1);
//        }

//      SparkConf conf = new SparkConf(true)
//        .set("spark.cassandra.connection.host", "127.0.1.1")
//        .set("spark.cassandra.auth.username", "cassandra")            
//        .set("spark.cassandra.auth.password", "cassandra");

        //SparkContext sc = new SparkContext("spark://127.0.1.1:9045", "test", conf);

        //return ;

        /* try{
            SparkConf conf = new SparkConf(true); 
            conf.setAppName("Spark-Cassandra Integration");
            conf.setMaster("yarn-cluster");
            conf.set("spark.cassandra.connection.host", "192.168.1.200");
            conf.set("spark.cassandra.connection.rpc.port", "9042");
            conf.set("spark.cassandra.connection.timeout_ms", "40000");
            conf.set("spark.cassandra.read.timeout_ms", "200000");
            System.out.println("Hi.......Main Method1111...");
            conf.set("spark.cassandra.auth.username","cassandra");
            conf.set("spark.cassandra.auth.password","cassandra");
            System.out.println("Connected Successful...!\n");
            App app = new App(conf);
            app.run();
       }catch(Exception e){System.out.println(e);}*/

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
//     conf.setMaster(args[0]);
//        conf.set("spark.cassandra.connection.host", args[1]);
          conf.setMaster("spark://192.168.1.117:7077");
          conf.set("spark.cassandra.connection.host", "192.168.1.200");
          conf.set("spark.cassandra.connection.port", "9042");
          conf.set("spark.ui.port","4040");
          conf.set("spark.cassandra.auth.username","cassandra");
          conf.set("spark.cassandra.auth.password","cassandra");
       App app = new App(conf);
        app.run();
    }

    public static class Product implements Serializable {
        private Integer id;
        private String name;
        private List<Integer> parents;

        public Product() { }

        public Product(Integer id, String name, List<Integer> parents) {
            this.id = id;
            this.name = name;
            this.parents = parents;
        }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getName() { return name; }
        public void setName(String name) { this.name = name; }

        public List<Integer> getParents() { return parents; }
        public void setParents(List<Integer> parents) { this.parents = parents; }

        @Override
        public String toString() {
            return MessageFormat.format("Product'{'id={0}, name=''{1}'', parents={2}'}'", id, name, parents);
        }
    }

    public static class Sale implements Serializable {
        private UUID id;
        private Integer product;
        private BigDecimal price;

        public Sale() { }

        public Sale(UUID id, Integer product, BigDecimal price) {
            this.id = id;
            this.product = product;
            this.price = price;
        }

        public UUID getId() { return id; }
        public void setId(UUID id) { this.id = id; }

        public Integer getProduct() { return product; }
        public void setProduct(Integer product) { this.product = product; }

        public BigDecimal getPrice() { return price; }
        public void setPrice(BigDecimal price) { this.price = price; }

        @Override
        public String toString() {
            return MessageFormat.format("Sale'{'id={0}, product={1}, price={2}'}'", id, product, price);
        }
    }

    public static class Summary implements Serializable {
        private Integer product;
        private BigDecimal summary;

        public Summary() { }

        public Summary(Integer product, BigDecimal summary) {
            this.product = product;
            this.summary = summary;
        }

        public Integer getProduct() { return product; }
        public void setProduct(Integer product) { this.product = product; }

        public BigDecimal getSummary() { return summary; }
        public void setSummary(BigDecimal summary) { this.summary = summary; }

        @Override
        public String toString() {
            return MessageFormat.format("Summary'{'product={0}, summary={1}'}'", product, summary);
        }
    }
}