Подтвердить что ты не робот

Почему происходит сбой соединения с "java.util.concurrent.TimeoutException: фьючерсы истекли через [300 секунд]"?

Я использую Spark 1.5.

У меня есть два фрейма данных формы:

scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]

scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]

libriFirstTable50Plus3DF имеет 766,151 записей, а linkPersonItemLessThan500DF имеет 26,694,353 записи. Обратите внимание, что я использую repartition(number) в linkPersonItemLessThan500DF, так как я намерен присоединиться к этим двум позже. Я следую приведенному выше коду:

val userTripletRankDF = linkPersonItemLessThan500DF
     .join(libriFirstTable50Plus3DF, Seq("family_id"))
     .take(20)
     .foreach(println(_))

для которого я получаю этот вывод:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
 at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
 at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
 at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
 at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
 at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
 at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
 at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)
 at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
 at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)
 at $iwC$$iwC$$iwC.<init>(<console>:93)
 at $iwC$$iwC.<init>(<console>:95)
 at $iwC.<init>(<console>:97)
 at <init>(<console>:99)
 at .<init>(<console>:103)
 at .<clinit>(<console>)
 at .<init>(<console>:7)
 at .<clinit>(<console>)
 at $print(<console>)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
 at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

и я не понимаю, в чем проблема. Это так же просто, как увеличение времени ожидания? Является ли соединение слишком интенсивным? Нужно ли мне больше памяти? Является ли интенсивное переполнение? Может ли кто-нибудь помочь?

4b9b3361

Ответ 1

Это происходит потому, что Spark пытается выполнить Broadcast Hash Join, а один из фреймов данных очень большой, поэтому его отправка занимает много времени.

Вы можете:

  1. Установите выше значение spark.sql.broadcastTimeout чтобы увеличить время ожидания - spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist persist() оба DataFrames, тогда Spark будет использовать Shuffle Join - ссылка отсюда

PySpark

В PySpark вы можете установить конфигурацию при создании контекста spark следующим образом:

spark = SparkSession
  .builder
  .appName("Your App")
  .config("spark.sql.broadcastTimeout", "36000")
  .getOrCreate()

Ответ 2

Просто добавьте некоторый контекст кода к очень краткому ответу от @T. Гавенда.


В вашем приложении Spark Spark SQL выбрал широковещательное хеш-соединение для объединения, поскольку "libriFirstTable50Plus3DF имеет 766 151 записей", что оказалось меньше, чем так называемый широковещательный порог (по умолчанию 10 МБ).

Вы можете контролировать порог широковещания, используя свойство конфигурации spark.sql.autoBroadcastJoinThreshold.

spark.sql.autoBroadcastJoinThreshold Настраивает максимальный размер в байтах для таблицы, которая будет транслироваться всем рабочим узлам при выполнении объединения. При установке этого значения на -1 вещание может быть отключено. Обратите внимание, что в настоящее время статистика поддерживается только для таблиц Hive Metastore, в которых была выполнена команда ANALYZE TABLE COMPUTE STATISTICS noscan.

Вы можете найти этот конкретный тип соединения в трассировке стека:

org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala: 110)

Физический оператор BroadcastHashJoin в Spark SQL использует широковещательную переменную для распространения меньшего набора данных среди исполнителей Spark (вместо того, чтобы отправлять его копию при каждой задаче).

Если вы использовали explain для просмотра физического плана запроса, вы заметили, что в запросе используется физический оператор BroadcastExchangeExec. Здесь вы можете увидеть базовый механизм трансляции таблицы меньшего размера (и время ожидания).

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
  ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}

doExecuteBroadcast является частью контракта SparkPlan которым следует каждый физический оператор в Spark SQL, который позволяет широковещательную SparkPlan при необходимости. BroadcastExchangeExec случается с этим.

Параметр времени ожидания - это то, что вы ищете.

private val timeout: Duration = {
  val timeoutValue = sqlContext.conf.broadcastTimeout
  if (timeoutValue < 0) {
    Duration.Inf
  } else {
    timeoutValue.seconds
  }
}

Как вы можете видеть, вы можете полностью отключить его (используя отрицательное значение), что подразумевает ожидание бесконечной sqlContext.conf.broadcastTimeout широковещательной переменной исполнителям или использование sqlContext.conf.broadcastTimeout который является в точности свойством конфигурации spark.sql.broadcastTimeout. Значение по умолчанию составляет 5 * 60 секунд, которые вы можете увидеть в трассировке стека:

java.util.concurrent.TimeoutException: время фьючерса истекло после [300 секунд]

Ответ 3

В моем случае это было вызвано передачей через большой фрейм данных:

df.join(broadcast(largeDF))

Итак, основываясь на предыдущих ответах, я исправил это, удалив трансляцию:

df.join(largeDF)

Ответ 4

В дополнение к увеличению spark.sql.broadcastTimeout или persist() обоих фреймов данных,

Вы можете попробовать:

1. отключить трансляцию, установив spark.sql.autoBroadcastJoinThreshold в -1

2. увеличить память искрового драйвера, установив spark.driver.memory на более высокое значение.