Подтвердить что ты не робот

Переименование столбцов для агрегатов данных pyspark

Я анализирую некоторые данные с фреймами данных pyspark, предположим, что у меня есть dataframe df, который я агрегирую:

df.groupBy("group")\
  .agg({"money":"sum"})\
  .show(100)

Это даст мне:

group                SUM(money#2L)
A                    137461285853
B                    172185566943
C                    271179590646

Агрегирование работает отлично, но мне не нравится новое имя столбца "СУММ (деньги # 2L)". Есть ли простой способ переименовать этот столбец в нечто, читаемое человеком из метода .agg? Возможно, что-то более похожее на то, что можно было бы сделать в dplyr:

df %>% group_by(group) %>% summarise(sum_money = sum(money))
4b9b3361

Ответ 1

Хотя я по-прежнему предпочитаю синтаксис dplyr, этот фрагмент кода будет делать:

import pyspark.sql.functions as sf

df.groupBy("group")\
  .agg(sf.sum('money').alias('money'))\
  .show(100)

Он становится многословным.

Ответ 2

withColumnRenamed должен сделать трюк. Вот ссылка на pyspark.sql API.

df.groupBy("group")\
  .agg({"money":"sum"})\
  .withColumnRenamed("SUM(money)", "money")
  .show(100)

Ответ 3

Я сделал для этого небольшую вспомогательную функцию, которая может помочь некоторым людям.

import re

from functools import partial

def rename_cols(agg_df, ignore_first_n=1):
    """changes the default spark aggregate names 'avg(colname)' 
    to something a bit more useful. Pass an aggregated dataframe
    and the number of aggregation columns to ignore.
    """
    delimiters = "(", ")"
    split_pattern = '|'.join(map(re.escape, delimiters))
    splitter = partial(re.split, split_pattern)
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
    renamed = map(split_agg, agg_df.columns[ignore_first_n:])
    renamed = zip(agg_df.columns[ignore_first_n:], renamed)
    for old, new in renamed:
        agg_df = agg_df.withColumnRenamed(old, new)
    return agg_df

Пример:

gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
 .groupby("id")
 .agg({"rank": "mean",
       "*": "count",
       "rate": "mean", 
       "price": "mean", 
       "clicks": "mean", 
       })
)

>>> gb.columns
['id',
 'avg(rate)',
 'count(1)',
 'avg(price)',
 'avg(rank)',
 'avg(clicks)']

>>> rename_cols(gb).columns
['id',
 'avg_rate',
 'count_1',
 'avg_price',
 'avg_rank',
 'avg_clicks']

Делать хоть немного, чтобы уберечь людей от печатания так много.

Ответ 4

df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
    start_index = column.find('(')
    end_index = column.find(')')
    if (start_index and end_index):
        df = df.withColumnRenamed(column, column[start_index+1:end_index])

Приведенный выше код может удалить все, что находится за пределами "()". Например, "sum (foo)" будет переименован в "foo".

Ответ 5

Это просто как:

 val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()

Используйте .as в agg, чтобы назвать новую созданную строку.

Ответ 6

Учитывая, что у вас есть словарь columns_and_operations и, после агрегирования, вы хотите сделать переименование без columns_and_operations, более простым способом будет:

from functools import reduce

columns_and_operations = {
        "rank": "mean",
        "*": "count",
        "rate": "mean", 
        "price": "mean", 
         "clicks": "mean"}

df = df.groupBy("group").agg(columns_and_operations)

old_names = ["{}({})".format(v, k) for k, v in columns_and_operations.items()]
new_names = list(columns_and_operations.keys())

df = reduce(lambda df, i: df.withColumnRenamed(old_names[i],
                                               new_names[i]),
            range(len(old_names)),
            df)

Ответ 7

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']

df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
|  1| siva|    100|
|  2|siva2|    200|
|  3|siva3|    300|
|  4|siva4|    400|
|  5|siva5|    500|
+---+-----+-------+


**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+