Подтвердить что ты не робот

Как обновить переменную широковещания в потоке искрового потока?

У меня, я считаю, относительно распространенный прецедент для искрового потока:

У меня есть поток объектов, которые я хотел бы фильтровать на основе некоторых справочных данных

Вначале я думал, что это будет очень простая задача для достижения использования широковещательной переменной:

public void startSparkEngine {
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    }

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
            // Final processing of filtered objects
        });
        return null;
    });
}

Однако, хотя редко, мои справочные данные будут периодически меняться

У меня создалось впечатление, что я могу модифицировать и повторно транслировать мою переменную на драйвер, и она будет распространена среди каждого из рабочих, однако объект Broadcast не Serializable и должен быть final.

Какие у меня альтернативы? Три решения, о которых я могу думать, следующие:

  • Переместите поиск ссылочных данных в forEachPartition или forEachRdd, чтобы он полностью находился на рабочих. Однако ссылочные данные живут в REST API, поэтому мне также нужно каким-то образом хранить таймер/счетчик, чтобы остановить доступ к удаленному доступу для каждого элемента в потоке.

  • Перезапустите контекст Spark каждый раз при обновлении refdata с новой переменной Broadcast.

  • Преобразуйте ссылочные данные в RDD, а затем join потоки таким образом, чтобы теперь я потоковал Pair<MyObject, RefData>, хотя это будет отправлять опорные данные каждому объекту.

4b9b3361

Ответ 1

Расширение ответа. @Rohan Aletty. Вот пример кода BroadcastWrapper, который обновляет широковещательную переменную на основе некоторого ttl

public class BroadcastWrapper {

    private Broadcast<ReferenceData> broadcastVar;
    private Date lastUpdatedAt = Calendar.getInstance().getTime();

    private static BroadcastWrapper obj = new BroadcastWrapper();

    private BroadcastWrapper(){}

    public static BroadcastWrapper getInstance() {
        return obj;
    }

    public JavaSparkContext getSparkContext(SparkContext sc) {
       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
       return jsc;
    }

    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
        Date currentDate = Calendar.getInstance().getTime();
        long diff = currentDate.getTime()-lastUpdatedAt.getTime();
        if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
            if (var != null)
               var.unpersist();
            lastUpdatedAt = new Date(System.currentTimeMillis());

            //Your logic to refresh
            ReferenceData data = getRefData();

            var = getSparkContext(sparkContext).broadcast(data);
       }
       return var;
   }
}

Ваш код будет выглядеть так:

public void startSparkEngine() {

    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
    });

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
        // Final processing of filtered objects
        });
        return null;
    });
}

Это сработало для меня и для нескольких кластеров. Надеюсь, что это поможет

Ответ 2

Почти все, кто имеет дело с потоковыми приложениями, нуждаются в способе перетаскивания (фильтра, поиска и т.д.) справочных данных (из БД, файлов и т.д.) в потоковые данные. Мы имеем частичное решение всех двух частей

  • Справочные данные поиска, которые будут использоваться в потоковых операциях

    • создать объект CacheLookup с нужным кешем TTL
    • оберните это в Broadcast
    • использовать CacheLookup как часть потоковой логики

В большинстве случаев это работает отлично, за исключением следующих

  1. Обновить справочные данные

    Нет никакого определенного пути достижения этого, несмотря на предложения в этих потоках, т.е. убить предыдущую переменную широковещания и создать новую. Множественные неизвестные, как ожидаемые между этими операциями.

Это такая общая потребность, это помогло бы, если бы был способ отправить информацию для обновления информирования переменной трансляции. С этим можно сделать недействительными локальные кеши в "CacheLookup"

Вторая часть проблемы все еще не решена. Мне было бы интересно, если есть какой-либо жизнеспособный подход к этому

Ответ 3

Не уверен, что вы уже пробовали это, но я думаю, что обновление переменной широковещания может быть достигнуто без отключения SparkContext. Используя метод unpersist(), копии широковещательной переменной удаляются для каждого исполнителя и должны быть переменными, которые должны быть ретранслированы чтобы получить доступ снова. Для вашего варианта использования, когда вы хотите обновить свою трансляцию, вы можете:

  • Подождите, пока ваши исполнители закончат текущую серию данных.

  • Непродвигать переменную широковещания

  • Обновить широковещательную переменную

  • Rebroadcast для отправки новых справочных данных исполнителям

Я рисую довольно сильно из этот пост, но тот, кто сделал последний ответ, утверждал, что получил его работу локально. Важно отметить, что вы, вероятно, хотите установить блокировку на true на непервисте, чтобы вы могли быть уверены, что исполнители избавлены от старых данных (поэтому устаревшие значения не будут прочитаны снова на следующей итерации).

Ответ 4

Недавно столкнулся с проблемой с этим. Думаю, это может быть полезно для пользователей Scala..

Scala способ сделать BroadCastWrapper, как BroadCastWrapper ниже.

import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag

/* wrapper lets us update brodcast variables within DStreams' foreachRDD
 without running into serialization issues */
case class BroadcastWrapper[T: ClassTag](
 @transient private val ssc: StreamingContext,
  @transient private val _v: T) {

  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false): Unit = {

    v.unpersist(blocking)
    v = ssc.sparkContext.broadcast(newValue)
  }

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(v)
  }

  private def readObject(in: ObjectInputStream): Unit = {
    v = in.readObject().asInstanceOf[Broadcast[T]]
  }
}

Каждый раз, когда вам нужно вызвать функцию обновления, чтобы получить новую широковещательную переменную.

Ответ 5

Я попробовал ниже

import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag

/* wrapper lets us update brodcast variables within DStreams' foreachRDD
 without running into serialization issues */
case class BroadcastWrapper[T: ClassTag](
 @transient private val ssc: StreamingContext,
  @transient private val _v: T) {

  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false): Unit = {

    v.unpersist(blocking)
    v = ssc.sparkContext.broadcast(newValue)
  }

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(v)
  }

  private def readObject(in: ObjectInputStream): Unit = {
    v = in.readObject().asInstanceOf[Broadcast[T]]
  }
}

Но получаю ошибку ниже.

java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

Не могли бы вы мне помочь.