Я вижу некоторые проблемы с производительностью во время выполнения запросов с использованием dataframes. Я видел в своих исследованиях, что длительные окончательные задачи могут быть признаком того, что данные не нарушены оптимально, но не нашли подробного процесса для решения этой проблемы.
Я начинаю загрузку двух таблиц как dataframes, и затем я присоединяюсь к этим таблицам в одном поле. Я попытался добавить дистрибутив (repartition) и отсортировать его, чтобы улучшить производительность, но я все еще вижу эту одну длинную конечную задачу. Вот простая версия моего кода, обратите внимание, что запрос один и два на самом деле не так просты и используют UDF для вычисления некоторых значений.
Я пробовал несколько разных настроек для spark.sql.shuffle
. Я пробовал 100, но это не удалось (я действительно не отлаживал это, чтобы быть честным). Я пробовал 300, 4000 и 8000. Производительность снижалась с каждым увеличением. Я выбираю один день данных, каждый из которых составляет час.
val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")
val distributeDf1 = df1
.repartition(df1("userId"))
.sortWithinPartitions(df1("userId"))
val distributeDf2 = df2
.repartition(df2("userId"))
.sortWithinPartitions(df2("userId"))
distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")
val df3 = sqlContext
.sql("""
Select
df1.*
from
df1
left outer join df2 on
df1.userId = df2.userId""")
Так как кажется, что разбиение на userId не является идеальным, я мог бы разбить по метке времени. Если я это сделаю, должен ли я просто сделать дату + час? Если у меня будет менее 200 уникальных комбо для этого, у меня будут пустые исполнители?