Подтвердить что ты не робот

Какой наиболее эффективный способ фильтрации DataFrame

..., проверив, находится ли значение столбца в seq.
Возможно, я не очень хорошо объясняю это, я в основном хочу этого (чтобы выразить это с помощью обычного SQL): DF_Column IN seq?

Сначала я сделал это с помощью broadcast var (где я поместил seq), UDF (это проверили) и registerTempTable.
Проблема в том, что я не смог ее протестировать, так как я столкнулся с известной ошибкой, которая, по-видимому, появляется только при использовании registerTempTable с ScalaIDE.

Я закончил создание нового DataFrame из seq и выполнял внутреннее соединение с ним (пересечение), но я сомневаюсь, что это самый эффективный способ выполнения задачи.

Спасибо

РЕДАКТИРОВАТЬ: (в ответ на @YijieShen):
Как сделать filter на основе того, находятся ли элементы одного столбца DataFrame в другом столбце DF (например, SQL select * from A where login in (select username from B))?

например: Первый DF:

login      count
login1     192  
login2     146  
login3     72   

Второй DF:

username
login2
login3
login4

Результат:

login      count
login2     146  
login3     72   

Попытки:
EDIT-2: Я думаю, теперь, когда ошибка исправлена, они должны работать. END EDIT-2

ordered.select("login").filter($"login".contains(empLogins("username")))

и

ordered.select("login").filter($"login" in empLogins("username"))

которые оба бросают Exception in thread "main" org.apache.spark.sql.AnalysisException, соответственно:

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter Contains(login#8, username#10);

и

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter login#8 IN (username#10);
4b9b3361

Ответ 1

  • Вы должны транслировать Set вместо Array, намного быстрее, чем линейный.

  • Вы можете заставить Eclipse запустить приложение Spark. Вот как:

Как указано в списке рассылки, spark-sql предполагает, что его классы загружаются изначальным загрузчиком классов. Это не тот случай, когда в Eclipse были библиотеки Java и Scala, загружались как часть пути класса загрузки, тогда как код пользователя и его зависимости находятся в другом. Вы можете легко исправить это в диалоговом окне конфигурации запуска:

  • удалить Scala Библиотека и Scala Компилятор из записей "Bootstrap"
  • добавить (как внешние банки) scala-reflect, scala-library и scala-compiler к пользовательской записи.

Диалог должен выглядеть так:

enter image description here

Изменить: Исправлена ​​ошибка Исправлена ​​ошибка, и это обходное решение больше не нужно (начиная с версии 1.4.0)

Ответ 2

Мой код (после описания вашего первого метода) обычно работает в Spark 1.4.0-SNAPSHOT для этих двух конфигураций:

  • Intellij IDEA test
  • Spark Standalone cluster с 8 узлами (1 мастер, 7 человек)

Пожалуйста, проверьте, существуют ли какие-либо различия.

val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")

val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))

xdf.show()
filtered.show()

Выход

имя cnt
login1 192
login2 146
login3 72

имя cnt
login3 72