Подтвердить что ты не робот

Исключительная задача Spark занимает в 100 раз больше, чем первая 199, как улучшить

Я вижу некоторые проблемы с производительностью во время выполнения запросов с использованием dataframes. Я видел в своих исследованиях, что длительные окончательные задачи могут быть признаком того, что данные не нарушены оптимально, но не нашли подробного процесса для решения этой проблемы.

Я начинаю загрузку двух таблиц как dataframes, и затем я присоединяюсь к этим таблицам в одном поле. Я попытался добавить дистрибутив (repartition) и отсортировать его, чтобы улучшить производительность, но я все еще вижу эту одну длинную конечную задачу. Вот простая версия моего кода, обратите внимание, что запрос один и два на самом деле не так просты и используют UDF для вычисления некоторых значений.

Я пробовал несколько разных настроек для spark.sql.shuffle. Я пробовал 100, но это не удалось (я действительно не отлаживал это, чтобы быть честным). Я пробовал 300, 4000 и 8000. Производительность снижалась с каждым увеличением. Я выбираю один день данных, каждый из которых составляет час.

val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")

val distributeDf1 = df1
    .repartition(df1("userId"))
    .sortWithinPartitions(df1("userId"))

val distributeDf2 = df2
    .repartition(df2("userId"))
    .sortWithinPartitions(df2("userId"))

distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")

val df3 = sqlContext
  .sql("""
    Select 
      df1.* 
    from 
      df1 
    left outer join df2 on 
      df1.userId = df2.userId""")

Так как кажется, что разбиение на userId не является идеальным, я мог бы разбить по метке времени. Если я это сделаю, должен ли я просто сделать дату + час? Если у меня будет менее 200 уникальных комбо для этого, у меня будут пустые исполнители?

4b9b3361

Ответ 1

У вас явно есть проблема с огромным правильным перекосом данных. Давайте посмотрим статистику, которую вы предоставили:

df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] 
df2 = [mean=1.0, stddev=0.0, count=18408194]

Со средним значением около 5 и стандартным отклонением более 2000 вы получаете длинный хвост.

Поскольку некоторые ключи намного чаще, чем другие, после перераспределения некоторых исполнителей будет гораздо больше работы, чем остальных.

Кроме того, ваше описание подсказывает, что проблема может быть связана с одним или несколькими ключами, которые имеют хэш в том же разделе.

Итак, сначала определите выбросы (псевдокод):

val mean = 4.989209978967438 
val sd = 2255.654165352454

val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache

val frequent = counts
  .where($"count" > mean + 2 * sd)  // Adjust threshold based on actual dist.
  .alias("frequent")
  .join(df1, Seq("userId"))

а остальное:

val infrequent = counts
  .where($"count" <= mean + 2 * sd)
  .alias("infrequent")
  .join(df1, Seq("userId"))

Можно ли ожидать чего-то? Если нет, попробуйте определить источник проблемы вверх по течению.

Если ожидается, вы можете попробовать:

  • передача меньшего стола:

    val df2 = sqlContext.sql("Select * from Table2")
    df2.join(broadcast(df1), Seq("userId"), "rightouter")
    
  • расщепление, объединение (union) и широковещательная передача:

    df2.join(broadcast(frequent), Seq("userId"), "rightouter")
      .union(df2.join(infrequent, Seq("userId"), "rightouter"))
    
  • соление userId с некоторыми случайными данными

но вы не должны:

  • перераспределить все данные и отсортировать их локально (хотя сортировка только локально не должна быть проблемой)
  • выполнять стандартные хеш-соединения для полных данных.