Подтвердить что ты не робот

Фильтр, основанный на другом RDD в Spark

Я хотел бы оставить только сотрудников, у которых есть идентификатор деления, упомянутый во второй таблице.

Employee table
LastName    DepartmentID
Rafferty    31
Jones   33
Heisenberg  33
Robinson    34
Smith   34

Department table
DepartmentID
31  
33  

Я пробовал следующий код, который не работает:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()

Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

Любые идеи? Я использую Spark 1.1.0 с Python. Однако я бы принял ответ Scala или Python.

4b9b3361

Ответ 1

В этом случае то, что вы хотели бы достичь, - это фильтр в каждом разделе с данными, содержащимися в таблице отдела: Это было бы основным решением:

val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)}

Если ваши данные отдела велики, переменная широковещательной передачи повысит производительность, предоставляя данные один раз всем узлам, а не сериализуя их с каждой задачей.

val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)}

Хотя использование соединения будет работать, это очень дорогое решение, так как для достижения соединения требуется распределенная перетасовка данных (byKey). Учитывая, что требование является простым фильтром, отправка данных в каждый раздел (как показано выше) обеспечит гораздо лучшую производительность.

Ответ 2

Наконец я реализовал решение, используя соединение. Мне пришлось добавить значение 0 в отдел, чтобы избежать исключения из Spark:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
# invert id and name to get id as the key
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0]))
# add a 0 value to avoid an exception
department = sc.parallelize(department).map(lambda d: (d,0))

employee.join(department).map(lambda e: (e[1][0], e[0])).collect()

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)]

Ответ 3

Фильтрация нескольких значений в нескольких столбцах:

В случае, когда вы извлекаете данные из базы данных (тип Hive или SQL типа db для этого примера) и должны фильтроваться по нескольким столбцам, проще всего загрузить таблицу с первым фильтром, а затем повторить фильтры через RDD (несколько небольших итераций - это поощряемый способ программирования Spark):

{
    import org.apache.spark.sql.hive.HiveContext
    val hc = new HiveContext(sc)

    val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)")
    val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20")
    val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500")

}

Конечно, вы должны знать свои данные немного, чтобы фильтровать правильные значения, но эта часть процесса анализа.

Ответ 4

для одного и того же exm выше, я хотел бы оставить только сотрудников, которые содержали или в идентификаторе департамента, упомянутом во второй таблице. но это не должно быть никакой операции присоединения, я бы увидел ее в "содержащем" или "в", i означает, что 33 находится в "334 и 335

employee = [['Raffery',311], ['Jones',334], ['Heisenberg',335], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)